python3 调用 salt-api

使用python3调用 salt-api

在项目中我们不能使用命令行的模式去调用salt-api,所以我们可以写一个基于salt-api的类,方便项目代码的调用。在这里特别附上两种方式实现的python3版本的salt-api class。

方式一

#python3.x
import pycurl
from io import BytesIO
import json

class PyCurl(object):
    def __init__(self, url, **kwargs):
        # 传入url地址
        self.url = url
        # 取出header相关信息
        self.header = kwargs.get("header", None)
        # 创建一个curl对象
        self.curl = pycurl.Curl()
        # setopt 来设置一些请求选项
        # 指定请求的URL
        self.curl.setopt(self.curl.URL, self.url)
        # 设置代理浏览器
        self.curl.setopt(self.curl.HEADER, False)
        # 设置请求方式
        self.curl.setopt(self.curl.POST, True)
        # 设置https方式
        self.curl.setopt(pycurl.SSL_VERIFYPEER, 0)
        self.curl.setopt(pycurl.SSL_VERIFYHOST, 0)
        # 判断header是否存在
        if self.header:
            # 设置模拟浏览器
            self.curl.setopt(self.curl.HTTPHEADER, self.header)

    def request(self, data=None, timeout=None):
        # 判断对象类型 是否为 str
        if isinstance(data, str):
            #将数据提交
            self.curl.setopt(pycurl.POSTFIELDS, data)
        header_buf = BytesIO()
        body_buf = BytesIO()
        # 强制获取新的连接,即替代缓存中的连接
        self.curl.setopt(self.curl.FRESH_CONNECT, True)
        # 完成交互后强制断开连接,不重用
        self.curl.setopt(self.curl.FORBID_REUSE, True)
        if str(timeout).isdigit() and timeout > 0:
            # 设置timeout超时时间
            self.curl.setopt(self.curl.TIMEOUT, timeout)
        # 将返回的HTTP HEADER定向到回调函数header_buf
        self.curl.setopt(self.curl.HEADERFUNCTION, header_buf.write)
        # 将返回的内容定向到回调函数body_buf
        self.curl.setopt(self.curl.WRITEFUNCTION, body_buf.write)
        try:
            # 服务器返回信息
            self.curl.perform()
        except pycurl.error:
            return False
        # 状态码
        http_code = self.curl.getinfo(self.curl.HTTP_CODE)
        # 关闭连接
        self.curl.close()
        # 返回状态码 header body
        return {"http_code": http_code, "header": header_buf.getvalue(), "body": body_buf.getvalue(), "url": self.url}


class SaltApi(object):

    def __init__(self,**kwargs):

        # 设置超时时间
        self.timeout = kwargs.get("timeout", 300)
        # 设置头信息
        self.header = kwargs.get("header", ["Content-Type:application/json"])
        # 获取url
        self.__url = "https://192.168.104.76:8000"

        # 获取
        self.__username = "salt-api"
        self.__password = "salt-api"

    # token id 获取
    def token_id(self):
        obj = {'eauth': 'pam', 'username': self.__username, 'password': self.__password}
        result = self.post(prefix="/login",**obj)
        if result:
            try:
                self.__token_id = result['return'][0]['token']
            except KeyError:
                raise KeyError
        print(self.__token_id)
        return self.__token_id

    def post(self, prefix="/",token=None,**data):

        # url拼接
        url = self.__url + prefix
        print (data)
        # 实例化
        self.header.append(str(token))
        curl = PyCurl(url, header=self.header)
        # 发起请求
        result = curl.request(data=json.dumps(data), timeout=self.timeout)
        # 判断值
        if not result:
            return result
        #判断状态码是否等于200
        if result["http_code"] != 200:
            self.response = "response code %s".format(result["info"]["http_code"])
            return self.response
        result = json.loads(result["body"].decode())
        # 判断是否有error
        if "error" in result and result["error"]:
            self.response = "%s(%s)" % (result["error"]["data"], result["error"]["code"])
            return self.response
        #返回正确的数据
        return result

    def all_key(self):
        '''
        获取所有的minion_key
        '''
        token = 'X-Auth-Token:%s'%self.token_id()
        obj = {'client': 'wheel', 'fun': 'key.list_all'}
        content = self.post(token=token,**obj)
        # 取出认证已经通过的
        minions = content['return'][0]['data']['return']['minions']
        #print('已认证',minions)
        # 取出未通过认证的
        minions_pre = content['return'][0]['data']['return']['minions_pre']
        # print('未认证',minions_pre)
        return minions,minions_pre

    def accept_key(self,node_name):
        '''
        如果你想认证某个主机 那么调用此方法
        '''
        token = 'X-Auth-Token:%s' % self.token_id()
        obj = {'client': 'wheel', 'fun': 'key.accept','match':node_name}
        content = self.post(token=token,**obj)
        print (content)
        ret = content['return'][0]['data']['success']
        return ret

    # 删除认证方法
    def delete_key(self, node_name):
        obj = {'client': 'wheel', 'fun': 'key.delete', 'match': node_name}
        token = 'X-Auth-Token:%s' % self.token_id()
        content = self.post(token=token, **obj)

        ret = content['return'][0]['data']['success']
        return ret

    # 针对主机远程执行模块
    def host_remote_func(self, tgt, fun):
        ''' tgt是主机 fun是模块
            写上模块名 返回 可以用来调用基本的资产
            例如 curl -k https://ip地址:8080/ \
        >      -H "Accept: application/x-yaml" \
        >      -H "X-Auth-Token:b50e90485615309de0d83132cece2906f6193e43" \
        >      -d client='local' \
        >      -d tgt='*' \
        >      -d fun='test.ping'  要执行的模块
        return:
        - iZ28r91y66hZ: true
          node2.minion: true
        '''
        obj = {'client': 'local', 'tgt': tgt, 'fun': fun}
        token = 'X-Auth-Token:%s' % self.token_id()
        content = self.post(token=token, **obj)
        ret = content['return'][0]
        return ret

    def group_remote_func(self,tgt,fun):
        obj = {'client': 'local', 'tgt': tgt, 'fun': fun,'expr_form': 'nodegroup'}
        token = 'X-Auth-Token:%s' % self.token_id()
        content = self.post(token=token, **obj)
        print (content)
        ret = content['return'][0]
        return ret

    def host_remote_execution_module(self,tgt,fun,arg):
        '执行fun 传入传入参数arg '
        obj = {'client': 'local', 'tgt': tgt, 'fun': fun,'arg': arg}
        token = 'X-Auth-Token:%s' % self.token_id()
        content = self.post(token=token, **obj)
        ret = content['return'][0]
        return ret
        #print(salt_aa.host_remote_execution_module('*', 'cmd.run', 'ifconfig'))

    # 基于分组来执行
    def group_remote_execution_module(self, tgt, fun, arg):
        '''
        根据分组来执行
        tgt =
        '''

        obj = {'client': 'local', 'tgt': tgt, 'fun': fun, 'arg': arg, 'expr_form': 'nodegroup'}
        token = 'X-Auth-Token:%s' % self.token_id()
        content = self.post(token=token, **obj)
        jid = content['return'][0]
        return jid

    def host_sls(self, tgt, arg):
        '''主机进行sls'''
        obj = {'client': 'local', 'tgt': tgt, 'fun': 'state.sls', 'arg': arg}
        token = 'X-Auth-Token:%s' % self.token_id()
        content = self.post(token=token, **obj)
        return content

    def group_sls(self, tgt, arg):
        ''' 分组进行sls '''
        obj = {'client': 'local', 'tgt': tgt, 'fun': 'state.sls', 'arg': arg, 'expr_form': 'nodegroup'}
        token = 'X-Auth-Token:%s' % self.token_id()
        content = self.post(token=token, **obj)
        jid = content['return'][0]['jid']
        return jid

    def host_sls_async(self, tgt, arg):
        '''主机异步sls '''
        obj = {'client': 'local_async', 'tgt': tgt, 'fun': 'state.sls', 'arg': arg}
        token = 'X-Auth-Token:%s' % self.token_id()
        content = self.post(token=token, **obj)
        jid = content['return'][0]['jid']
        return jid

    def group_sls_async(self, tgt, arg):
        '''分组异步sls '''
        obj = {'client': 'local_async', 'tgt': tgt, 'fun': 'state.sls', 'arg': arg, 'expr_form': 'nodegroup'}
        token = 'X-Auth-Token:%s' % self.token_id()
        content = self.post(token=token, **obj)
        jid = content['return'][0]['jid']
        return jid

    def server_group_pillar(self, tgt, arg, **kwargs):
        '''分组进行sls and pillar'''
        obj = {'client': 'local', 'tgt': tgt, 'fun': 'state.sls', 'arg': arg, 'expr_form': 'nodegroup',
               'kwarg': kwargs}
        token = 'X-Auth-Token:%s' % self.token_id()
        content = self.post(token=token, **obj)
        jid = content['return'][0]
        print (jid)

    def server_hosts_pillar(self, tgt, arg,**kwargs):
        '''针对主机执行sls and pillar '''
        obj = {"client": "local", "tgt": tgt, "fun": "state.sls", "arg": arg,"kwarg":kwargs}
        token = 'X-Auth-Token:%s' % self.token_id()
        content = self.post(token=token, **obj)
        jid = content['return'][0]
        return jid

    def jobs_all_list(self):
        '''打印所有jid缓存'''
        token = 'X-Auth-Token:%s' % self.token_id()
        obj = {"client": "runner", "fun": "jobs.list_jobs"}
        content = self.post(token=token, **obj)
        print (content)

    def jobs_jid_status(self, jid):
        '''查看jid运行状态'''
        token = 'X-Auth-Token:%s' % self.token_id()
        obj = {"client": "runner", "fun": "jobs.lookup_jid", "jid": jid}
        content = self.post(token=token, **obj)
        print (content)
        return content
if __name__ == '__main__':

    sa = saltapi.SaltApi()
    print (sa.host_remote_execution_module('node76','cmd.run','ifconfig'))
    print (sa.accept_key("node76"))

方式二

#python3x
import urllib,json
import urllib.request
import urllib.parse
import ssl

ssl._create_default_https_context = ssl._create_unverified_context


class SaltAPI(object):
    __token_id = ''

    def __init__(self,url,user,password):
        self.__url = url
        self.__user = user
        self.__password = password

    def token_id(self):
        """
            用户登陆和获取token
        :return:
        """
        params = {'eauth': 'pam', 'username': self.__user, 'password': self.__password}
        encode = urllib.parse.urlencode(params)
        obj = urllib.parse.unquote(encode).encode('utf-8')
        content = self.postRequest(obj, prefix='/login')
        try:
            self.__token_id = content['return'][0]['token']
        except KeyError:
            raise KeyError

    def postRequest(self,obj,prefix='/'):
        url = self.__url + prefix
        headers = {'X-Auth-Token': self.__token_id}
        req = urllib.request.Request(url, obj, headers)
        opener = urllib.request.urlopen(req)
        content = json.loads(opener.read().decode('utf-8'))
        return content

    def list_all_key(self):
        """
            获取包括认证、未认证salt主机
        """

        params = {'client': 'wheel', 'fun': 'key.list_all'}
        obj = urllib.parse.urlencode(params).encode('utf-8')
        self.token_id()
        content = self.postRequest(obj)
        minions = content['return'][0]['data']['return']['minions']
        minions_pre = content['return'][0]['data']['return']['minions_pre']
        return minions, minions_pre

    def delete_key(self, node_name):
        '''
            拒绝salt主机
        '''

        params = {'client': 'wheel', 'fun': 'key.delete', 'match': node_name}
        obj = urllib.parse.urlencode(params).encode('utf-8')
        self.token_id()
        content = self.postRequest(obj)
        ret = content['return'][0]['data']['success']
        return ret

    def accept_key(self,node_name):
        '''
            接受salt主机
        '''

        params = {'client': 'wheel', 'fun': 'key.accept', 'match': node_name}
        obj = urllib.parse.urlencode(params).encode('utf-8')
        self.token_id()
        content = self.postRequest(obj)
        ret = content['return'][0]['data']['success']
        return ret

    def salt_get_jid_ret(self,jid):
        """
            通过jid获取执行结果
        :param jid: jobid
        :return: 结果
        """
        params = {'client':'runner', 'fun':'jobs.lookup_jid', 'jid': jid}
        obj = urllib.parse.urlencode(params).encode('utf-8')
        self.token_id()
        content = self.postRequest(obj)
        ret = content['return'][0]
        return ret

    def salt_running_jobs(self):
        """
            获取运行中的任务
        :return: 任务结果
        """
        params = {'client':'runner', 'fun': 'jobs.active'}
        obj = urllib.parse.urlencode(params).encode('utf-8')
        self.token_id()
        content = self.postRequest(obj)
        ret = content['return'][0]
        return ret

    def remote_noarg_execution_sigle(self, tgt, fun):
        """
            单台minin执行命令没有参数
        :param tgt: 目标主机
        :param fun:  执行模块
        :return: 执行结果
        """
        params = {'client': 'local', 'tgt': tgt, 'fun': fun}
        obj = urllib.parse.urlencode(params).encode('utf-8')
        self.token_id()
        content = self.postRequest(obj)
        # print(content)
        # {'return': [{'salt-master': True}]}
        ret = content['return'][0]
        return ret

    def remote_execution_single(self, tgt, fun, arg):
        """
            单台minion远程执行,有参数
        :param tgt: minion
        :param fun: 模块
        :param arg: 参数
        :return: 执行结果
        """
        params = {'client': 'local', 'tgt': tgt, 'fun': fun, 'arg': arg}
        obj = urllib.parse.urlencode(params).encode('utf-8')
        self.token_id()
        content = self.postRequest(obj)
        # print(content)
        # {'return': [{'salt-master': 'root'}]}
        ret = content['return']
        return ret

    def remote_async_execution_module(self, tgt, fun, arg):
        """
            远程异步执行模块,有参数
        :param tgt: minion list
        :param fun: 模块
        :param arg: 参数
        :return: jobid
        """
        params = {'client': 'local_async', 'tgt': tgt, 'fun': fun, 'arg': arg, 'expr_form': 'list'}
        obj = urllib.parse.urlencode(params).encode('utf-8')
        self.token_id()
        content = self.postRequest(obj)
        # print(content)
        # {'return': [{'jid': '20180131173846594347', 'minions': ['salt-master', 'salt-minion']}]}
        jid = content['return'][0]['jid']
        return jid

    def remote_execution_module(self, tgt, fun, arg):
        """
            远程执行模块,有参数
        :param tgt: minion list
        :param fun: 模块
        :param arg: 参数
        :return: dict, {'minion1': 'ret', 'minion2': 'ret'}
        """
        params = {'client': 'local', 'tgt': tgt, 'fun': fun, 'arg': arg, 'expr_form': 'list'}
        obj = urllib.parse.urlencode(params).encode('utf-8')
        self.token_id()
        content = self.postRequest(obj)
        # print(content)
        # {'return': [{'salt-master': 'root', 'salt-minion': 'root'}]}
        ret = content['return'][0]
        return ret

    def salt_state(self, tgt, arg, expr_form):
        '''
        sls文件
        '''
        params = {'client': 'local', 'tgt': tgt, 'fun': 'state.sls', 'arg': arg, 'expr_form': expr_form}
        obj = urllib.parse.urlencode(params).encode('utf-8')
        self.token_id()
        content = self.postRequest(obj)
        ret = content['return'][0]
        return ret

    def salt_alive(self, tgt):
        '''
        salt主机存活检测
        '''
        params = {'client': 'local', 'tgt': tgt, 'fun': 'test.ping'}
        obj = urllib.parse.urlencode(params).encode('utf-8')
        self.token_id()
        content = self.postRequest(obj)
        ret = content['return'][0]
        return ret


if __name__ == '__main__':
        salt = SaltAPI(url="https://192.168.104.76:8000",user="salt-api",password="salt-api")
        minions, minions_pre = salt.list_all_key()
        # 说明如果'expr_form': 'list',表示minion是以主机列表形式执行时,需要把list拼接成字符串,如下所示
        minions = ['node76', 'node76']
        hosts = map(str, minions)
        hosts = ",".join(hosts)
        ret = salt.remote_noarg_execution_sigle('node76', 'test.ping')
        print(ret)
        ret = salt.remote_noarg_execution_sigle('node76', 'test.ping')
        print(ret)
        # print(type(ret))

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Java Edge

压测软件Jmeter使用实例(WIN7环境)百科我们为什么使用JmeterJmeter安装配置Sampler监听器(Listener)点击启动按钮,开启测试Jmeter自定义变量Redis的压测

2745
来自专栏Linyb极客之路

RESTful API设计指南

在RESTful架构中,每个网址代表一种资源(resource),所以网址中不能有动词,只能有名词,而且所用的名词往往与数据库的表格名对应。一般来说,数据库中的...

853
来自专栏逆向技术

PE文件格式详解,第三讲,可选头文件格式,以及节表

          PE文件格式详解,第三讲,可选头文件格式,以及节表 一丶可选头结构以及作用 typedef struct _IMAGE_OPTIONAL_H...

2219
来自专栏小李刀刀的专栏

[译]Laravel 5.0 之事件调度程序 (定时任务)

本文译自 Matt Stauffer 的系列文章. ---- 关于 Laravel 5.0 的事件调度程序(可以理解为定时任务),Eric Barnes 有一篇...

2644
来自专栏Janti

Thrift教程初级篇——thrift安装环境变量配置第一个实例

前言:                     因为项目需要跨语言,c++客户端,web服务端,远程调用等需求,所以用到了RPC框架Thrift,刚开始有点...

3015
来自专栏F-Stack的专栏

FreeBSD下的工具(sysctl、netstat等)如何移植到F-Stack

在之前的文章中,我们介绍了如何使用DPDK rte_ring来进行多进程的通信,tools/ipc目录就是基于rte_ring实现了一个简单的ipc框架。下面以...

4691
来自专栏24K纯开源

用Qt写软件系列一:QCacheViewer(浏览器缓存查看器)

介绍      Cache技术广泛应用于计算机行业的软硬件领域。该技术既是人们对新技术探讨的结果,也是对当前软硬件计算能力的一种妥协。在浏览器中使用cache技...

1775
来自专栏依乐祝

Ocelot简易教程(四)之请求聚合以及服务发现

上篇文章给大家讲解了Ocelot的一些特性并对路由进行了详细的介绍,今天呢就大家一起来学习下Ocelot的请求聚合以及服务发现功能。希望能对大家有所帮助。

1102
来自专栏linux驱动个人学习

高通 display 驱动【转】

5454
来自专栏Linyb极客之路

使用Jmeter进行http接口性能测试

在进行网页或应用程序后台接口开发时,一般要及时测试开发的接口能否正确接收和返回数据,对于单次测试,Postman插件是个不错的Http请求模拟工具。

1596

扫码关注云+社区