前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >大数据ETL实践探索(4)---- 搜索神器Elastic search

大数据ETL实践探索(4)---- 搜索神器Elastic search

作者头像
流川疯
发布2019-01-18 15:36:03
9890
发布2019-01-18 15:36:03
举报


大数据ETL 系列文章简介

本系列文章主要针对ETL大数据处理这一典型场景,基于python语言使用Oracle、aws、Elastic search 、Spark 相关组件进行一些基本的数据导入导出实战,如:

  • oracle使用数据泵impdp进行导入操作。
  • aws使用awscli进行上传下载操作。
  • 本地文件上传至aws es
  • spark dataframe录入ElasticSearch

等典型数据ETL功能的探索。

系列文章:

1.大数据ETL实践探索(1)---- python 与oracle数据库导入导出

2.大数据ETL实践探索(2)---- python 与aws 交互

3.大数据ETL实践探索(3)---- pyspark 之大数据ETL利器

4.大数据ETL实践探索(4)---- 之 搜索神器elastic search

5.使用python对数据库,云平台,oracle,aws,es导入导出实战

6.aws ec2 配置ftp----使用vsftp

7.浅谈pandas,pyspark 的大数据ETL实践经验


本地文件导入aws Elastic search

网络配置

修改访问策略,设置本地电脑的公网ip,这个经常会变化,每次使用时候需要设置一下

环境配置

安装anancota

https://www.anaconda.com/download/

初始化环境,win10下打开Anaconda Prompt 的命令行

代码语言:javascript
复制
conda create -n elasticsearch python=3.6
source activate elasticsearch 
pip install elasticsearch
pip install pandas

数据录入

如果突然来了一批非常大的数据要录入到Elastic search 中怎么办。

使用脚本如下:windows获取当前文件夹下所有csv并转换成pandas 的dataframe建立索引录入Elastic search

代码语言:javascript
复制
# 有问题的并行数据录入代码
from elasticsearch import helpers, Elasticsearch
import pandas as pd
from time import time

import win_unicode_console
win_unicode_console.enable()

import os  
  
def file_name(file_dir):   
    for root, dirs, files in os.walk(file_dir):  
        print(root) #当前目录路径  
        print(dirs) #当前路径下所有子目录  
        print(files) #当前路径下所有非目录子文件  
        return [item for item in files if '.csv' in item]


root_path=os.getcwd()+'\\'

fileslist = file_name(root_path)

# size of the bulk
chunksize=50000

for file in fileslist:
    t0=time()
    f = open(root_path+file,'r', encoding='UTF-8') # read csv

    # 使用 pandas 解析csv
    csvfile=pd.read_csv(f, iterator=True, chunksize=chunksize,low_memory=False) 

    # 初始化es
    es = Elasticsearch(["https://yoururl.amazonaws.com.cn"])

    # 初始化索引
    try :
        es.indices.delete(file.strip('.csv').lower())
    except :
        pass

    es.indices.create(file.strip('.csv').lower())

    # start bulk indexing 
    print("now indexing %s..."%(file))

    for i,df in enumerate(csvfile): 
        print(i)
        records=df.where(pd.notnull(df), None).T.to_dict()
        list_records=[records[it] for it in records]
        try :
            helpers.parallel_bulk(es, list_records, index=file.strip('.csv').lower(), doc_type=file.strip('.csv').lower(),thread_count=8)
        except :
            print("error!, skip records...")
            pass

    print("done in %.3fs"%(time()-t0))

并行录入

上一段代码发现,数据录入es时候有问题,由于并行录入是懒加载的模式,所以数据居然没录进去,按照下面链接提供的思路,代码需要如下修改:

代码实例:

https://www.programcreek.com/python/example/104891/elasticsearch.helpers.parallel_bulk

参考帖子:

https://discuss.elastic.co/t/helpers-parallel-bulk-in-python-not-working/39498

代码语言:javascript
复制
from elasticsearch import helpers, Elasticsearch
import pandas as pd
from time import time

from elasticsearch.helpers import BulkIndexError
from elasticsearch.exceptions import TransportError,ConnectionTimeout,ConnectionError
import traceback  
import logging
logging.basicConfig(filename='log-for_.log',
                    format='%(asctime)s -%(name)s-%(levelname)s-%(module)s:%(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S %p',
                    level=logging.ERROR)


import win_unicode_console
win_unicode_console.enable()

import os  
  
def file_name(file_dir):   
    for root, dirs, files in os.walk(file_dir):  
        print(root) #当前目录路径  
        print(dirs) #当前路径下所有子目录  
        print(files) #当前路径下所有非目录子文件  
        return [item for item in files if '.csv' in item]

#NAME = "PV_PROV_LOG"
root_path=os.getcwd()+'\\'
#csv_filename="%s.csv" % NAME



fileslist = file_name(root_path)

# size of the bulk
chunksize=1000

for file in fileslist:

    t0=time()
    # open csv file
    
    f = open(root_path+file,'r', encoding='UTF-8') # read csv

    # parse csv with pandas
    csvfile=pd.read_csv(f, iterator=True, chunksize=chunksize,low_memory=False) 

    # init ElasticSearch
    es = Elasticsearch(["..."])

    # init index
    try :
        es.indices.delete(file.strip('.csv').lower())
    except :
        pass

    es.indices.create(file.strip('.csv').lower())

    # start bulk indexing 
    print("now indexing %s..."%(file))

    for i,df in enumerate(csvfile): 
        print(i)
        records=df.where(pd.notnull(df), None).T.to_dict()
        list_records=[records[it] for it in records]
        #print(list_records)
        try :
            #helpers.bulk(es, list_records, index=file.strip('.csv').lower(), doc_type=file.strip('.csv').lower())
            for success, info in helpers.parallel_bulk(es, list_records, index=file.strip('.csv').lower(), doc_type=file.strip('.csv').lower(),thread_count=8):
                if not success:
                    print('A document failed:', info)
            #helpers.parallel_bulk(es, list_records, index=file.strip('.csv').lower(), doc_type=file.strip('.csv').lower(),thread_count=8)
        except ConnectionTimeout:
            logging.error("this is ES ConnectionTimeout ERROR \n %s"%str(traceback.format_exc()))
            logging.info('retry bulk es')
        except TransportError:
            
            logging.error("this is ES TransportERROR \n %s"%str(traceback.format_exc()))
            logging.info('retry bulk es')
        except ConnectionError:
            logging.error("this is ES ConnectionError ERROR \n %s"%str(traceback.format_exc()))
            logging.info('retry bulk es')
            
        except BulkIndexError:
            logging.error("this is ES BulkIndexError ERROR \n %s"%str(traceback.format_exc()))
            logging.info('retry bulk es')
            pass
        except Exception:
            logging.error("exception not match \n %s"%str(traceback.format_exc()))
            logging.error('retry bulk es')
            pass
        except :
            print("error!, skiping some records")
            print (list_records)
            print(json.loads(result))
            pass

    print("done in %.3fs"%(time()-t0))

Elastic search 基本命令

使用curl 命令发送查询请求

代码语言:javascript
复制
#获取索引记录条数
curl -X GET 172.31.45.69:9200/index/doc/_count

结果

代码语言:javascript
复制
{"count":155000,"_shards":{"total":5,"successful":5,"skipped":0,"failed":0}}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018年12月09日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 大数据ETL 系列文章简介
  • 本地文件导入aws Elastic search
    • 网络配置
      • 环境配置
      • 数据录入
        • 并行录入
        • Elastic search 基本命令
        相关产品与服务
        Elasticsearch Service
        腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档