我有一个足够大的数据集,可以在AWS OpenSearch中批量索引JSON对象。
我看不出如何使用以下任何一种方法来实现这一点: boto3、awswrangler、opensearch-py、elasticsearch、elasticsearch-py。
有没有一种不直接使用python请求(PUT/POST)的方法来做到这一点?
请注意,这不是用于: ElasticSearch,AWS ElasticSearch。
非常感谢!
发布于 2022-06-09 11:19:24
我终于找到了一种使用opensearch-py实现它的方法,如下所示。
首先建立客户,
# First fetch credentials from environment defaults
# If you can get this far you probably know how to tailor them
# For your particular situation. Otherwise SO is a safe bet :)
import boto3
credentials = boto3.Session().get_credentials()
region='eu-west-2' # for example
auth = AWSV4SignerAuth(credentials, region)
# Now set up the AWS 'Signer'
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
auth = AWSV4SignerAuth(credentials, region)
# And finally the OpenSearch client
host=f"...{region}.es.amazonaws.com" # fill in your hostname (minus the https://) here
client = OpenSearch(
hosts = [{'host': host, 'port': 443}],
http_auth = auth,
use_ssl = True,
verify_certs = True,
connection_class = RequestsHttpConnection
)呼!现在让我们创建数据:
# Spot the deliberate mistake(s) :D
document1 = {
"title": "Moneyball",
"director": "Bennett Miller",
"year": "2011"
}
document2 = {
"title": "Apollo 13",
"director": "Richie Cunningham",
"year": "1994"
}
data = [document1, document2]小费!如果你需要创建索引-
my_index = 'my_index'
try:
response = client.indices.create(my_index)
print('\nCreating index:')
print(response)
except Exception as e:
# If, for example, my_index already exists, do not much!
print(e)这是事情有点疯狂的地方。我还没有意识到每一个大容量的动作都需要一个,呃,action,例如“索引”,“搜索”等等。
action={
"index": {
"_index": my_index
}
}您可以在那里阅读有关散装 REST的所有内容。
下一个奇怪之处是,OpenSearch大容量API需要新行分隔的JSON (参见https://www.ndjson.org),它基本上是将JSON序列化为字符串,并由换行符分隔。有人在上面写道,这个“奇怪的”API看起来就像一个数据科学家设计的API--我认为这不是冒犯,而是石头。(我同意ndjson很奇怪。)
可怕的是,现在让我们构建完整的JSON字符串,将数据和操作结合起来。一名新军助手就在手边!
def payload_constructor(data,action):
# "All my own work"
action_string = json.dumps(action) + "\n"
payload_string=""
for datum in data:
payload_string += action_string
this_line = json.dumps(datum) + "\n"
payload_string += this_line
return payload_string好,现在我们终于可以调用大容量API了。我想你可以混合各种行动(超出了这里的范围)-去吧!
response=client.bulk(body=payload_constructor(data,action),index=my_index)这可能是史上最无聊的笑话,但你有。
您还可以让(geddit) .bulk()只使用index=并将操作设置为:
action={"index": {}}嘿,普雷托!
现在,选择你的毒药-另一个解决方案看起来更短,更整洁。
PS,隐藏良好的opensearch-py文档位于这里。
发布于 2022-06-08 23:56:37
conn = wr.opensearch.connect(
host=self.hosts, # URL
port=443,
username=self.username,
password=self.password
)
def insert_index_data(data, index_name='stocks', delete_index_data=False):
""" Bulk Create
args: body [{doc1}{doc2}....]
"""
if delete_index_data:
index_name = 'symbol'
self.delete_es_index(index_name)
resp = wr.opensearch.index_documents(
self.conn,
documents=data,
index=index_name
)
print(resp)
return resp发布于 2022-07-02 07:50:06
我使用下面的代码将从postgres插入的记录大容量插入到OpenSearch (es7.2)
import sqlalchemy as sa
from sqlalchemy import text
import pandas as pd
import numpy as np
from opensearchpy import OpenSearch
from opensearchpy.helpers import bulk
import json
engine = sa.create_engine('postgresql+psycopg2://postgres:postgres@127.0.0.1:5432/postgres')
host = 'search-xxxxxxxxxx.us-east-1.es.amazonaws.com'
port = 443
auth = ('username', 'password') # For testing only. Don't store credentials in code.
# Create the client with SSL/TLS enabled, but hostname verification disabled.
client = OpenSearch(
hosts = [{'host': host, 'port': port}],
http_compress = True,
http_auth = auth,
use_ssl = True,
verify_certs = True,
ssl_assert_hostname = False,
ssl_show_warn = False
)
with engine.connect() as connection:
result = connection.execute(text("select * from account_1_study_1.stg_pred where domain='LB'"))
records = []
for row in result:
record = dict(row)
record.update(record['item_dataset'])
del record['item_dataset']
records.append(record)
df = pd.DataFrame(records)
#df['Date'] = df['Date'].astype(str)
df = df.fillna("null")
print(df.keys)
documents = df.to_dict(orient='records')
#bulk(es ,documents, index='search-irl-poc-dump', raise_on_error=True)\
#response=client.bulk(body=documents,index='sample-index')
bulk(client, documents, index='search-irl-poc-dump', raise_on_error=True, refresh=True)https://stackoverflow.com/questions/72544983
复制相似问题