我基本上是想从csv文件中索引数据。
我成功地创建了一个索引。
es.indices.create(index='hash_test', ignore=400)并添加了一个基线索引,其中包含了我的dataframe中包含的列和示例数据。
es.index(index="hash_test", doc_type="hash-test", id=rand_id, body={
'FILENAME': '6.js',
'HASH': 'b4d44ed618112e41cb7e8f33bb19a414',
'DATE': '2018-11-15'})跑得很好。
下面是如何将数据解析为适当的格式,并迭代行并将数据索引到类似于上面的Elasticsearch中。
def index_data(data_path, chunksize, index_name, doc_type):
f = open(data_path)
csvfile = pd.read_csv(f, iterator=True, chunksize=chunksize,sep="£",encoding="utf-8-sig",index_col=0,engine="python")
dictionary = {'Â':''}
es = Elasticsearch('http://*.*.*.*:9200/')
for i,df in enumerate(csvfile):
rand_id = uuid.uuid4();
df.replace(dictionary, regex=True, inplace=True)
df.columns = df.columns.str.replace('Â', '')
records=df.where(pd.notnull(df),None).T.to_dict()
list_records=[records[it] for it in records]
json_data = str(''.join(str(v) for v in list_records))
try:
es.index(index_name, doc_type, rand_id, json_data)
except:
print("error!")
pass我不得不对dataframe进行一些解析,因为每个行和列()中都有一个奇怪的字符。
打印要索引的值时
print(index_name, doc_type, rand_id, json_data)我得到了我想要的
hash_test hash-test 51eacee2-e2b1-4886-82f5-1373ec59c640 {'FILENAME': '6.js', 'HASH': 'b4d44ed618112e41cb7e8f33bb19a414', 'DATE': '2018-11-15'}但是,当我运行它时,我会得到以下错误;
RequestError: RequestError(400, 'mapper_parsing_exception', 'failed to parse')它试图提供以下数据:
{"_index":"hash_test","_type":"hash-test","_id":"{'FILENAME': '8.js', 'HASH': 'b4d44ed618112e41cb7e8f33bb19a414', 'DATE': '2018-11-15'}","found":false}它完全忽略rand_id参数,并且当我执行以下操作时:
es.index(index_name, doc_type, json_data, rand_id)它忽略了json_data参数……
{"_index":"hash_test","_type":"hash-test","_id":"93eadd1b-6859-474b-9750-b618b800b4d5","found":false}我不明白我得到的输出有什么不同,当我指定id参数时,我很难理解主体是如何在_id字段中结束的。
提前为任何帮助干杯。
发布于 2018-11-29 11:27:56
因此,毫不奇怪,我通过从dataframe创建一个干净的JSON字符串,使我需要做的事情复杂化了。我学会了在熊猫中使用to_json函数,而不是使用字典,然后使用列表(我认为这是我错误的来源)。
下面的代码清除了这一点,并将我的数据索引到我的Elasticsearch实例中。
def index_data(data_path, chunksize, index_name, doc_type):
f = open(data_path)
csvfile = pd.read_csv(f, iterator=True, chunksize=chunksize,sep="£",encoding="utf-8-sig",index_col=0,engine="python")
es = Elasticsearch('http://*.*.*.*:9200/')
for i,df in enumerate(csvfile):
rand_id = uuid.uuid4(); #create a random id
data=df.to_json(orient='records', lines=True)
try:
es.index(index=index_name,doc_type=doc_type,id=rand_id,body=data)
except TransportError as e:
print(e.info)https://stackoverflow.com/questions/53524608
复制相似问题