数据上传与下载

最近更新时间:2023-12-07 14:28:14

我的收藏

1. 直接调用的函数

1.1. 接口列表

download_from_hdfs(hdfs_url, hdfs_path, local_path)
从hdfs上下载文件到本地
:param hdfs_url: webhdfs的地址,如:http://10.0.3.16:4008
:type hdfs_url: str
:param hdfs_path: hdfs上的路径
:type hdfs_path: str
:param local_path: 本地路径
:type local_path: str
:return: 本地的结果路径
:rtype: str

upload_to_hdfs(local_path, hdfs_url, hdfs_path)
上传本地目录到hdfs上
:param local_path: 本地路径
:type local_path: str
:param hdfs_url: webhdfs的地址,如:http://10.0.3.16:4008
:type hdfs_url: str
:param hdfs_path: hdfs上的路径
:type hdfs_path: str
:return: hdfs的结果路径
:rtype: str

upload_to_hive_by_hdfs(local_path, hdfs_url, hive_server, table_name, database='default', auth='CUSTOM', username=None, password=None, overwrite=False, partition='')
把本地文件的数据导入到hive表中,hdfs作为中间存储。
过程:先把本地文件上传到hdfs上,然后再从hdfs文件导入到hive中。
:param local_path: 本地文件或者文件夹。文件夹中不能包含子文件夹。
:type local_path: str
:param hdfs_url: webhdfs的url,如:http://10.0.3.16:4008
:type hdfs_url: str
:param hive_server: HiveServer2的地址
:type hive_server: str
:param table_name: Hive表的名称
:type table_name: str
:param database: 数据库名称
:type database: str
:param auth: 认证的方式
:type auth: str
:param username: 数据库认证的用户名
:type username: str
:param password: 数据库认证的密码
:type password: str
:param overwrite: 是否删掉原来的数据
:type overwrite: bool
:param partition: 分区的选择
:type partition: str
:return:
:rtype:

export_from_hive_by_hdfs(local_path, hdfs_url, hive_server, table_name='', sql='', database='default', auth='CUSTOM', username=None, password=None, row_format="row format delimited fields terminated by ','")
导出hive表到本地上,hdfs作为中间存储。对于大文件,这种方式比直接从hive表写到本地的效率更高。
过程:先把hive导出到hdfs上,然后再从hdfs下载文件到本地。
:param local_path: 本地的目录
:type local_path: str
:param hdfs_url: webhdfs的url,如:http://10.0.3.16:4008
:type hdfs_url: str
:param hive_server: HiveServer2的地址
:type hive_server: str
:param table_name: Hive表的名称。sql设置时忽略此参数
:type table_name: str
:param sql: 查数据sql语句。如:select * from t1
:type sql: str
:param database: 数据库名称
:type database: str
:param auth: 认证的方式
:type auth: str
:param username: 数据库认证的用户名
:type username: str
:param password: 数据库认证的密码
:type password: str
:param row_format: 行的输出格式
:type row_format: str
:return:
:rtype:

1.2. 使用方法:

import tikit

tikit.upload_to_hdfs("dir1/file1", "http://10.0.3.16:4008", "/dir1/file1")

2. 通过client调用的方法

describe_cos_buckets(self)
列出所有bucket
:return: bucket列表
:rtype: dict
返回的结果如:
{
"Owner": {
"ID": "qcs::cam::uin/100011011262:uin/100011011262",
"DisplayName": "100011011162"
},
"Buckets": {
"Bucket": [
{
"Name": "bucket-58565",
"Location": "ap-beijing-fsi",
"CreationDate": "2021-07-21T11:06:00Z",
"BucketType": "cos"
},
{
"Name": "tai-1300158565",
"Location": "ap-guangzhou",
"CreationDate": "2021-10-22T11:04:40Z",
"BucketType": "cos"
}
]
}
}

describe_cos_path(self, bucket, path, maker='', max_keys=1000, encoding_type='')
获取cos的目录的信息。目录下的内容最多显示1000个。要显示目录下的文件和文件夹,请以"/"结尾
:param bucket: cos的桶
:type bucket: str
:param path: 路径
:type path: str
:return: 目录信息
:rtype: dict
:param maker: 从marker开始列出条目
:type maker: str
:param max_keys: 设置单次返回最大的数量,最大为1000.
:type max_keys: int
:param encoding_type: 设置返回结果编码方式,只能设置为url.
:type encoding_type: str

upload_to_cos(self, local_path, bucket, cos_path)
把本地路径下的文件或者目录上传到cos上
:param local_path: 本地路径
:type local_path: str
:param bucket: cos上的桶
:type bucket: str
:param cos_path: cos上的路径
:type cos_path: str
:return: None. 错误信息通过 raise 给出。
:rtype:

download_from_cos(self, bucket, cos_path, local_path)
从cos上下载文件或者目录本地。
注意:本地文件存在会直接覆盖。 cos_path为目录且local_path 为存在的目录的时候,cos_path的文件夹名会作为子目录保留。
:param bucket: cos上的桶
:type bucket: str
:param cos_path: cos上的路径
:type cos_path: str
:param local_path: 本地路径
:type local_path: str
:return: None. 错误信息通过 raise 给出。
:rtype:


delete_cos_path(self, bucket, delete_path)
删除cos目录。要删除目录下的文件和文件夹,请以"/"结尾,即:不带斜杠的,当成文件来删除;带斜杠的,当成文件夹来删除
:param bucket: cos的桶
:type bucket: str
:param delete_path: 要删除的路径
:type delete_path: str

3. 通过HiveInitial调用的方法

3.1 初始化

from tikit.client import Client
from tikit.hive import HiveInitial

client = Client("your_secret_id", "your_secret_key", "<region>")

hive_init = HiveInitial(client)
hive_init.hive_initial("emr-xsjhbhf", "hadoop", "./emr.keytab")

3.2 接口

def spark_hive_initial_wedata(self, wedata_id, source_account=None):
"""为spark初始化wedata hive(调用改方法后,就可以使用spark做hive操作)

:param wedata_id: wedata数据源id
:type wedata_id: int
:param source_account: 如果hive为系统源,则需要传使用账户uin
:type source_account: str
:rtype:
"""
def hive_initial_wedata(self, wedata_id, source_account=None):
"""获取wedata hive连接句柄

:param wedata_id: wedata数据源id
:type wedata_id: int
:param source_account: 如果hive为系统源,则需要传使用账户uin
:type source_account: str
:rtype:
"""

def spark_hive_initial(self, emr_id, username=None, keytab=None):
"""为spark初始化emr hive(调用改方法后,就可以使用spark做hive操作)

:param emr_id: 腾讯云emr id
:type emr_id: str
:param username: 如果使用kerberos认证,则需要传入对应用户名
:type username: str
:param keytab: keytab文件路径。如果使用集群默认账户(如hadoop),则需要提供keytab路径
:type keytab: str
:rtype:
"""

def hive_initial(self, emr_id, username=None, keytab=None):
"""获取emr hive连接句柄

:param emr_id: 腾讯云emr id
:type emr_id: str
:param username: 如果使用kerberos认证,则需要传入对应用户名
:type username: str
:param keytab: keytab文件路径。如果使用集群默认账户(如hadoop),则需要提供keytab路径
:type keytab: str
:rtype:
"""

def hive_initial_custom(
self,
host=None,
port=None,
scheme=None,
username=None,
database='default',
auth=None,
configuration=None,
kerberos_service_name=None,
password=None,
check_hostname=None,
ssl_cert=None,
thrift_transport=None):
"""Connect to HiveServer2

:param host: What host HiveServer2 runs on
:param port: What port HiveServer2 runs on. Defaults to 10000.
:param auth: The value of hive.server2.authentication used by HiveServer2.
Defaults to ``NONE``.
:param configuration: A dictionary of Hive settings (functionally same as the `set` command)
:param kerberos_service_name: Use with auth='KERBEROS' only
:param password: Use with auth='LDAP' or auth='CUSTOM' only
:param thrift_transport: A ``TTransportBase`` for custom advanced usage.
Incompatible with host, port, auth, kerberos_service_name, and password.

The way to support LDAP and GSSAPI is originated from cloudera/Impyla:
https://github.com/cloudera/impyla/blob/255b07ed973d47a3395214ed92d35ec0615ebf62
/impala/_thrift_api.py#L152-L160
"""

def upload_to_wedata_hive(self, wedata_id, local_path, table_name, database='default', overwrite=False,
partition='', source_account=None):
"""上传文件到wedata hive

:param wedata_id: wedata 数据源id
:type wedata_id: int
:param local_path: 本地文件路径
:type local_path: str
:param table_name: 表名
:type local_path: str
:param database: 数据库
:type database: str
:param overwrite: 是否删掉原来的数据
:type overwrite: bool
:param partition: 分区的选择
:type partition: str
:param source_account: 如果hive为系统源,则需要传使用账户uin
:type source_account: str
:rtype:
"""

def export_from_wedata_hive(self, wedata_id, local_path, table_name='', database='default', sql='',
row_format="row format delimited fields terminated by ','", source_account=None):
"""导出wedata hive数据到本地

:param wedata_id: wedata 数据源id
:type wedata_id: int
:param local_path: 本地文件路径
:type local_path: str
:param table_name: 表名
:type local_path: str
:param database: 数据库
:type database: str
:param sql: 查数据sql语句。如:select * from t1
:type sql: str
:param row_format: 行的输出格式
:type row_format: str
:param source_account: 如果hive为系统源,则需要传使用账户uin
:type source_account: str
:rtype:
"""