首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使用python库将JSON记录大容量上载到AWS OpenSearch索引?

如何使用python库将JSON记录大容量上载到AWS OpenSearch索引?
EN

Stack Overflow用户
提问于 2022-06-08 11:32:25
回答 3查看 3K关注 0票数 2

我有一个足够大的数据集,可以在AWS OpenSearch中批量索引JSON对象。

我看不出如何使用以下任何一种方法来实现这一点: boto3、awswrangler、opensearch-py、elasticsearch、elasticsearch-py。

有没有一种不直接使用python请求(PUT/POST)的方法来做到这一点?

请注意,这不是用于: ElasticSearch,AWS ElasticSearch。

非常感谢!

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2022-06-09 11:19:24

我终于找到了一种使用opensearch-py实现它的方法,如下所示。

首先建立客户,

代码语言:javascript
复制
# First fetch credentials from environment defaults
# If you can get this far you probably know how to tailor them
# For your particular situation. Otherwise SO is a safe bet :)
import boto3
credentials = boto3.Session().get_credentials()
region='eu-west-2' # for example
auth = AWSV4SignerAuth(credentials, region)

# Now set up the AWS 'Signer'
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
auth = AWSV4SignerAuth(credentials, region)

# And finally the OpenSearch client
host=f"...{region}.es.amazonaws.com" # fill in your hostname (minus the https://) here
client = OpenSearch(
    hosts = [{'host': host, 'port': 443}],
    http_auth = auth,
    use_ssl = True,
    verify_certs = True,
    connection_class = RequestsHttpConnection
)

呼!现在让我们创建数据:

代码语言:javascript
复制
# Spot the deliberate mistake(s) :D
document1 = {
    "title": "Moneyball",
    "director": "Bennett Miller",
    "year": "2011"
}

document2 = {
    "title": "Apollo 13",
    "director": "Richie Cunningham",
    "year": "1994"
}

data = [document1, document2]

小费!如果你需要创建索引-

代码语言:javascript
复制
my_index = 'my_index'

try:
    response = client.indices.create(my_index)
    print('\nCreating index:')
    print(response)
except Exception as e:
    # If, for example, my_index already exists, do not much!
    print(e)

这是事情有点疯狂的地方。我还没有意识到每一个大容量的动作都需要一个,呃,action,例如“索引”,“搜索”等等。

代码语言:javascript
复制
action={
    "index": {
        "_index": my_index
    }
}

您可以在那里阅读有关散装 REST的所有内容。

下一个奇怪之处是,OpenSearch大容量API需要新行分隔的JSON (参见https://www.ndjson.org),它基本上是将JSON序列化为字符串,并由换行符分隔。有人在上面写道,这个“奇怪的”API看起来就像一个数据科学家设计的API--我认为这不是冒犯,而是石头。(我同意ndjson很奇怪。)

可怕的是,现在让我们构建完整的JSON字符串,将数据和操作结合起来。一名新军助手就在手边!

代码语言:javascript
复制
def payload_constructor(data,action):
    # "All my own work"

    action_string = json.dumps(action) + "\n"

    payload_string=""

    for datum in data:
        payload_string += action_string
        this_line = json.dumps(datum) + "\n"
        payload_string += this_line
    return payload_string

好,现在我们终于可以调用大容量API了。我想你可以混合各种行动(超出了这里的范围)-去吧!

代码语言:javascript
复制
response=client.bulk(body=payload_constructor(data,action),index=my_index)

这可能是史上最无聊的笑话,但你有。

您还可以让(geddit) .bulk()只使用index=并将操作设置为:

代码语言:javascript
复制
action={"index": {}}

嘿,普雷托!

现在,选择你的毒药-另一个解决方案看起来更短,更整洁。

PS,隐藏良好的opensearch-py文档位于这里

票数 6
EN

Stack Overflow用户

发布于 2022-06-08 23:56:37

代码语言:javascript
复制
conn = wr.opensearch.connect(
         host=self.hosts, # URL
         port=443,
         username=self.username,
         password=self.password
    )

def insert_index_data(data, index_name='stocks', delete_index_data=False):
    """ Bulk Create 
        args: body [{doc1}{doc2}....]
    """
    if delete_index_data:
        index_name = 'symbol'
        self.delete_es_index(index_name)
    
    resp = wr.opensearch.index_documents(
         self.conn,
         documents=data,
         index=index_name   
     )
    print(resp)
    return resp
票数 0
EN

Stack Overflow用户

发布于 2022-07-02 07:50:06

我使用下面的代码将从postgres插入的记录大容量插入到OpenSearch (es7.2)

代码语言:javascript
复制
import sqlalchemy as sa
from sqlalchemy import text
import pandas as pd
import numpy as np
from opensearchpy import OpenSearch
from opensearchpy.helpers import bulk
import json

engine = sa.create_engine('postgresql+psycopg2://postgres:postgres@127.0.0.1:5432/postgres')

host = 'search-xxxxxxxxxx.us-east-1.es.amazonaws.com'
port = 443
auth = ('username', 'password') # For testing only. Don't store credentials in code.

# Create the client with SSL/TLS enabled, but hostname verification disabled.
client = OpenSearch(
    hosts = [{'host': host, 'port': port}],
    http_compress = True,
    http_auth = auth,
    use_ssl = True,
    verify_certs = True,
    ssl_assert_hostname = False,
    ssl_show_warn = False
)

        


with engine.connect() as connection:
    result = connection.execute(text("select * from account_1_study_1.stg_pred where domain='LB'"))
    records = []
    for row in result:
        record = dict(row)
        record.update(record['item_dataset'])
        del record['item_dataset']
        records.append(record)
    df = pd.DataFrame(records)
    #df['Date'] = df['Date'].astype(str)
    df = df.fillna("null")
    print(df.keys)
    documents = df.to_dict(orient='records')

    #bulk(es ,documents, index='search-irl-poc-dump', raise_on_error=True)\
    
    #response=client.bulk(body=documents,index='sample-index')
    bulk(client, documents, index='search-irl-poc-dump', raise_on_error=True, refresh=True)
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72544983

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档