前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python elasticsearch 使用示例

Python elasticsearch 使用示例

作者头像
保持热爱奔赴山海
发布2023-07-31 14:27:15
3480
发布2023-07-31 14:27:15
举报
文章被收录于专栏:饮水机管理员饮水机管理员

这里简单的罗列了些关于ES的自动化运维过程中可能用到的脚本DEMO

创建索引并设置shards数

代码语言:javascript
复制
# 省略部分代码

from elasticsearch import Elasticsearch

src_es = Elasticsearch(hosts = configs.es_source_host,maxsize=16)
dest_es = Elasticsearch(hosts = configs.es_dest_host,maxsize=16)

def create_dest_index():
    # 注意:shards数在索引创建时候设置,后期再更改就比较费事了(后续再改shards数,需要锁写或者reindex到新的索引)
    try:
        dest_es.indices.create(
            index=configs.es_dest_index,
            body={"settings": {"index": {"number_of_shards": 4}}},
        )

    except Exception as e:
        print(str(e))

调整索引的settings

代码语言:javascript
复制
# 省略部分代码

from elasticsearch import Elasticsearch

src_es = Elasticsearch(hosts = configs.es_source_host,maxsize=16)
dest_es = Elasticsearch(hosts = configs.es_dest_host,maxsize=16)

def update_dest_index_setting(time_dur,replicas):
    try:
        res = dest_es.indices.put_settings(
            index=configs.es_dest_index,
            body={"index.refresh_interval": time_dur, "number_of_replicas": replicas},
        )
        print(res)
    except Exception as e:
        print(str(e))

批量造测试数据

代码语言:javascript
复制
# -*- coding: utf-8 -*-

from elasticsearch import Elasticsearch

es = Elasticsearch('http://127.0.0.1:9200/')

index_name = "your_index"

doc_body = {
    "name": "小王",
    "age": 22,
    "sex": "Male",
    "addr":
        {
            "city": "guangzhou",
            "code": 1678533
        }
}

for i in range(5000):
    es.index(index=index_name, id=i, body=doc_body)

bulk指定_id的写法

代码语言:javascript
复制
from elasticsearch import Elasticsearch

# 高版本ES中,默认的bulk的不再支持显式指定_id,但是可以用下面的方法

# 创建 Elasticsearch 客户端
es = Elasticsearch('http://192.168.1.181:9200/')

# 定义要执行的批量操作
bulk_data = [
    {"index": {"_index": "your_index", "_id": 1111}},
    {"name": "小王", "age": 22, "sex": "Male", "addr": {"city": "beijing", "code": 10012}},
    {"index": {"_index": "your_index", "_id": 2222}},
    {"name": "小李", "age": 32, "sex": "Male", "addr": {"city": "shanghai", "code": 10010}},
    {"index": {"_index": "your_index", "_id": 3333}},
    {"name": "小孙", "age": 13, "sex": "Male", "addr": {"city": "guangzhou", "code": 1678533}},
]

# 使用 bulk API 执行批量操作
response = es.bulk(index='your_index', body=bulk_data)
# print(response)

# 检查响应结果
if response['errors']:
    for item in response['items']:
        if 'error' in item['index']:
            print(f"Failed operation: {item['index']}")
else:
    print("Bulk operations completed successfully!")

scroll遍历-写法1

代码语言:javascript
复制
# -*- coding: utf-8 -*-
# es.search里面入参scroll,这种写法啰嗦,但是方便后续的逻辑处理
# (例如将数据捞出来然后拼装并写到其它index里面,具体的实现可以看 scroll查询-并发写入.py)

import time

from elasticsearch import Elasticsearch

es = Elasticsearch([{'host': '127.0.0.1', 'port': 9200}])

start_ts = time.time()

scroll_time = '5m'  # 指定 Scroll 上下文的存活时间
index_name = 'index-test1'  # 替换为你的引名称

query = {
    "query": {
        "match_all": {}  # 查询所有文档
    }
}

# 初始化 Scroll 上下文
response = es.search(index=index_name, scroll=scroll_time, body=query,size=500)
scroll_id = response['_scroll_id']
print("scroll_id -->", scroll_id)

hits = response['hits']['hits']

# 计数下,用于最后确认scroll的数量情况
count = 0

# 处理第一批结果
for hit in hits:
    _id = hit["_id"]
    _source = hit["_source"]
    print(_id,_source)

count += 1

# 滚动获取剩余结果
while len(hits) > 0:
    response = es.scroll(scroll_id=scroll_id, scroll=scroll_time)
    scroll_id = response['_scroll_id']
    hits = response['hits']['hits']

    for hit in hits:
        _id, _source = hit["_id"], hit["_source"]
        print(_id,_source)

        count += 1

    print('------------------------------------------')


stop_ts = time.time()
print('scroll 遍历的总条数: ', count, '耗时(秒):', int(stop_ts - start_ts))

scroll遍历-写法2

代码语言:javascript
复制
# -*- coding: utf-8 -*-
# helpers.scan 迭代器的写法, 如果只是要为了取数据,可以用这种

import time

from elasticsearch import Elasticsearch, helpers

es = Elasticsearch([{'host': '127.0.0.1', 'port': 9200}])

start_ts = time.time()

scroll_duration = '5m'  # 指定 Scroll 上下文的存时间
index = 't1'  # 替换为你的引名称

query = {
    "query": {
        "match_all": {}  # 查询所有文档
    }
}

response = es.search(index=index, scroll=scroll_duration, body=query, size=500)
scroll_id = response['_scroll_id']
print("scroll_id -->", scroll_id)

count = 0

for hit in helpers.scan(es, query=query, index=index, scroll=scroll_duration):
    _id, _source = hit["_id"], hit["_source"]
    print(_id, _source)
    count += 1

stop_ts = time.time()
print(f'scroll 遍历的总条数: {count} 耗时(秒): {int(stop_ts - start_ts)}')

scroll查询数据后bulk批量写入

代码语言:javascript
复制
# -*- coding: utf-8 -*-
import json
import time

from elasticsearch import Elasticsearch

src_es = Elasticsearch([{'host': '127.0.0.1', 'port': 9200}])
dest_es = Elasticsearch([{'host': '127.0.0.1', 'port': 9200}])

start_ts = time.time()

scroll_time = '5m'  # 指定 Scroll 上下文的存活时间

index_name = 'index-test1'  # 替换为你的引名称
dest_index_name = 'index-test2'  # 需要写入的索引名

err_log_name = str(int(time.time())) + '.log'

query = {
    "query": {
        "match_all": {}  # 查询所有文档
    }
}

# 初始化 Scroll 上下文
response = src_es.search(index=index_name, scroll=scroll_time, body=query,size=1000)
scroll_id = response['_scroll_id']
print("scroll_id -->", scroll_id)

hits = response['hits']['hits']

# 计数下,用于最后确认scroll的数量情况
count = 0

# 处理第一批结果
data_list1=[]
for hit in hits:
    _id = hit["_id"]
    _source = hit["_source"]

    data1={}
    doc = hit
    _id, _source = doc["_id"], doc["_source"]
    data1["index"]= {"index": {"_index": dest_index_name , "_id": _id }}
    data_list1.append(data1["index"])
    data_list1.append(_source)

# 把第一次找出的数据,拼装好的结果写入目标ES
# print('----------------------------',data_list1)
dest_res = dest_es.bulk(index=dest_index_name, body=data_list1)
if dest_res["errors"]:
    for item in response["items"]:
        if "error" in item["index"]:
            print(f"Failed operation: {item['index']}")
else:
    print("Bulk operations completed successfully!")

count += 1

# 滚动获取剩余结果
while True:
    if len(hits) < 0:
        break

    response = src_es.scroll(scroll_id=scroll_id, scroll=scroll_time)
    scroll_id = response['_scroll_id']
    print("scroll_id ---> ", scroll_id )
    hits = response['hits']['hits']

    data_list2=[]
    for hit in hits:
        data2={}
        doc = hit
        _id, _source = doc["_id"], doc["_source"]
        data2["index"]= {"index": {"_index": dest_index_name , "_id": _id }}
        data_list2.append(data2["index"])
        data_list2.append(_source)

    # 把拼装好的结果写入目标ES
    # print('----------------------------',data_list2)
    if len(data_list2) <=0:
        break
    dest_res = dest_es.bulk(index=dest_index_name, body=data_list2)
    if dest_res["errors"]:
        for item in response["items"]:
            if "error" in item["index"]:
                print(f"Failed operation: {item['index']}")
    else:
        print("Bulk operations completed successfully!")

        # time.sleep(1)

        count += 1

    print('------------------------------------------')


stop_ts = time.time()
print('scroll 遍历的总条数: ', count, '耗时(秒):', int(stop_ts - start_ts))

ES的SQL语法

代码语言:javascript
复制
# -*- coding: utf-8 -*-
# 参考 https://zhuanlan.zhihu.com/p/341906989
# 使用SQL查询ES有一定的局限性,没有原生的Query DSL那么强大,对于嵌套属性和某些函数的支持并不怎么好,但是平时用来查询下数据基本够用了。
# 官方文档 https://www.elastic.co/guide/en/elasticsearch/reference/current/xpack-sql.html


# 高版本的ES里面,自带了sql接口

"""
1、直接使用sql语法,执行ES的查询
POST /_sql
{
  "query": "SELECT count(*),k FROM sbtest1  WHERE k>954808 group by k LIMIT 10"
}

2、将sql语法转为querydsl语法
POST /_sql/translate
{
  "query": "SELECT count(*),k FROM sbtest1  WHERE k>954808 group by k LIMIT 10"
}
"""

import json

from elasticsearch import Elasticsearch

es = Elasticsearch(["192.168.1.181:9200"])

# SQL查询语句
query_sql = {
    "query": "SELECT count(*),k FROM sbtest1  WHERE k>954808 group by k having count(*)>1 LIMIT 10"
}

# 案例1 直接使用SQL语法查出的结果
res = es.sql.query(body=query_sql)
print('直接使用SQL语法查出的结果--->\n',json.dumps(res))


query_sql_2 = {
  "query": "SHOW TABLES"
}
res = es.sql.query(body=query_sql_2)
print('show tables 结果--->\n',json.dumps(res))

"""
结果:
直接使用SQL语法查出的结果--->
 {"columns": [{"name": "count(*)", "type": "long"}, {"name": "k", "type": "long"}], "rows": [[1, 954846], [1, 954847], [1, 954868], [1, 954875], [1, 954900], [1, 954910], [1, 954923], [1, 954948], [1, 954960], [1, 955017]]}
"""

# 案例2 将SQL翻译成QueryDSL
res = es.sql.translate(body=query_sql)
print('将SQL翻译成QueryDSL--->\n',json.dumps(res))

"""
结果:
将SQL翻译成QueryDSL--->
 {"size": 0, "query": {"range": {"k": {"from": 954808, "to": null, "include_lower": false, "include_upper": false, "boost": 1.0}}}, "_source": false, "stored_fields": "_none_", "aggregations": {"groupby": {"composite": {"size": 10, "sources": [{"345": {"terms": {"field": "k", "missing_bucket": true, "order": "asc"}}}]}}}}
"""

获取mapping和设置mapping

代码语言:javascript
复制
# -*- coding: utf-8 -*-

from elasticsearch import Elasticsearch

# 创建 Elasticsearch 客户端
es = Elasticsearch([{"host": "127.0.0.1", "port": 9200}])
index_name = "index-test1"
new_index_name = "index-test1"


# 1 创建索引,并设置shard数(shard数量只能在这里设置,不支持后续调整)
try:
    es.indices.create(
        index=new_index_name,
        body={"settings": {"index": {"number_of_shards": 4}}},
    )

except Exception as e:
    print(str(e))


# 2 调整索引的参数设置索引,例如持久化时间,副本数
try:
    es.indices.put_settings(
        index=new_index_name,
        body={"index.refresh_interval": "60s", "number_of_replicas": 0},
    )
except Exception as e:
    print(str(e))


# 3 获取指定索引的映射信息
mapping = es.indices.get_mapping(index=index_name)
mapping_src = mapping[index_name]["mappings"]
# print(mapping_src)


# 4 对新索引设置mapping
try:
    res = es.indices.put_mapping(body=mapping_src, index=new_index_name)
    print(res)
except Exception as e:
    print(str(e))
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2023-07-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 创建索引并设置shards数
  • 调整索引的settings
  • 批量造测试数据
  • bulk指定_id的写法
  • scroll遍历-写法1
  • scroll遍历-写法2
  • scroll查询数据后bulk批量写入
  • ES的SQL语法
  • 获取mapping和设置mapping
相关产品与服务
Elasticsearch Service
腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档