专栏首页运维Python Elasticsearch API操作ES集群

Python Elasticsearch API操作ES集群

环境

  • Centos 7.4
  • Python 2.7
  • Pip 2.7 MySQL-python 1.2.5 Elasticsearc 6.3.1
  • Elasitcsearch6.3.2

知识点

  • 调用Python Elasticsearh API
  • Python Mysqldb使用
  • DSL查询与聚合
  • Python 列表操作

代码

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#minyt 2018.9.1
#获取24小时内出现的模块次数
# 该程序通过elasticsearch python client 获取相关精简数据,可以计算请求数、超时数、错误数、正确率、错误率等等
import MySQLdb
from elasticsearch import Elasticsearch
from elasticsearch import helpers

#定义elasticsearch集群索引名
index_name = "logstash-nginxlog-*"

#实例化Elasticsearch类,并设置超时间为180秒,默认是10秒的,如果数据量很大,时间设置更长一些
es = Elasticsearch(['elasticsearch01','elasticsearch02','elasticsearch03'],timeout=180)

#DSL(领域特定语言)查询语法,查询top50 sname的排列次数
data_sname = {
  "aggs": {
    "2": {
      "terms": {
        "field": "apistatus.sname.keyword",
        "size": 100,
        "order": {
          "_count": "desc"
        }
      }
    }
  },
  "size": 0,
  "_source": {
    "excludes": []
  },
  "stored_fields": [
    "*"
  ],
  "script_fields": {},
  "docvalue_fields": [
    "@timestamp"
  ],
  "query": {
    "bool": {
      "must": [
        {
          "match_all": {}
        },
        {
          "range": {
            "@timestamp": {
              "gte" : "now-24h/h",
              "lt" :  "now/h"
            }
          }
        }
      ],
      "filter": [],
      "should": [],
      "must_not": []
    }
  }
}

#按照DSL(特定领域语言)语法查询获取数据
def get_original_data():
    try:
        #根据上面条件搜索数据
        res = es.search(
            index=index_name,
            size=0,
            body=data_sname
        )
        return res

    except:
        print "get original data failure"

#初始化数据库
def init_mysql():
    # 打开数据库连接
    db = MySQLdb.connect("localhost", "myuser", "mypassword", "mydb", charset='utf8' )

    # 使用cursor()方法获取操作游标 
    cursor = db.cursor()

    # SQL 更新语句
    sql = "update appname set count=0"
    try:
        # 执行SQL语句
        cursor.execute(sql)
        # 提交到数据库执行
        db.commit()
    except:
        # 发生错误时回滚
        db.rollback()

    # 关闭数据库连接
    db.close()

def updata_mysql(sname_count,sname_list):
    # 打开数据库连接
       db = MySQLdb.connect("localhost", "myuser", "mypassword", "mydb", charset='utf8' )

    # 使用cursor()方法获取操作游标 
    cursor = db.cursor()

    # SQL 更新语句
    sql = "update appname set count=%d where sname = '%s'" % (sname_count,sname_list)
    try:
        # 执行SQL语句
        cursor.execute(sql)
        # 提交到数据库执行
        db.commit()
    except:
        # 发生错误时回滚
        db.rollback()

    # 关闭数据库连接
    db.close()

#根据Index数据结构通过Elasticsearch Python Client上传数据到新的Index
def import_process_data():
    try:
        #列表形式显示结果
        res = get_original_data()
        #print res
        res_list = res.get('aggregations').get('2').get('buckets')
        #print res_list

        #初始化数据库
        init_mysql()

        #获取24小时内出现的SNAME 
        for value in res_list:
            sname_list = value.get('key')
            sname_count = value.get('doc_count')
            print sname_list,sname_count
            #更新sname_status值
            updata_mysql(sname_count,sname_list)

    except Exception, e:
        print repr(e)

if __name__ == "__main__":
    import_process_data()

总结

关键是DSL语法的编写涉及查询与聚合可以通过kibana的visualize或者devtool先测试出正确语法,然后结合python对列表、字典、除法、字符串等操作即可。下面汇总下各个算法:

  • 总请求 http_host.keyword: api.mydomain.com
  • 超长请求 http_host.keyword: api.mydomain.com AND request_time: [1 TO 600] NOT apistatus.status.keyword:*错误
  • 错误请求 apistatus.status.keyword:*错误 AND (http_host.keyword: api.mydomain.com OR http_host.keyword: api.yourdomain.com )
  • 请求健康度 域名与request_time聚合,域名请求时间小于3秒的次数除以总请求次数对应各个域名健康度
  • 请求正确率 域名与http状态码聚合,域名http状态码为200的次数除以域名总请求数对应各个域名的请求正确率

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • kubernetes1.13.1集群集成harbor-helm

    https://github.com/goharbor/harbor-helm https://www.hi-linux.com/posts/14136.htm...

    三杯水Plus
  • linux文件树

    以前有意找这方面的资料,今天突然发现在系统中就有 linux系统用man hier solaris用man  filesystem 其结果如下     ...

    三杯水Plus
  • Gitlab通过控制台重置密码及解锁用户

    操作背景 Gitlab是Docker部署,Jenkins账号登陆不了,开始是怀疑密码不对,通过控制台登陆重置了密码,还是登陆不了,怀疑是Jenkins用户被锁住...

    三杯水Plus
  • Python爬虫的一次提问,引发的“乱码”问题

    近日,有位小伙伴向我请教,在爬取某网站时,网页的源代码出现了中文乱码问题。之前关于爬虫乱码有很多粉丝的各式各样的问题,今天恋习Python与大家一起总结下关于网...

    一墨编程学习
  • “ 一网打尽 ” 二进制、格雷码、独热码编码方式

    在一组数的编码中,若任意两个相邻的代码只有一位二进制数不同,则称这种编码为格雷码(Gray Code),另外由于最大数与最小数之间也仅一位数不同,即“首尾相连...

    数字芯片社区
  • Ubuntu ssh 安卓手机

    http://blog.csdn.net/u013752202/article/details/53648823

    zhangrelay
  • 基础知识 | 使用 Python 将数据写到 CSV 文件

    我们从网上爬取数据,最后一步会考虑如何存储数据。如果数据量不大,往往不会选择存储到数据库,而是选择存储到文件中,例如文本文件、CSV 文件、xls 文件等。因为...

    猴哥yuri
  • 那天晚上和@FeignClient注解的深度交流

    主要还是在技术群里看到有同学在问相关问题,比如: contextId 是干嘛的?name 相同的多个 Client 会报错?

    猿天地
  • 那天晚上和@FeignClient注解的深度交流

    主要还是在技术群里看到有同学在问相关问题,比如: contextId 是干嘛的?name 相同的多个 Client 会报错?

    黄泽杰
  • Akka 指南 之「分布式数据」

    为了使用分布式数据(Distributed Data),你需要将以下依赖添加到你的项目中:

    CG国斌

扫码关注云+社区

领取腾讯云代金券