首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >如何显式声明气流连接的charset=utf8

如何显式声明气流连接的charset=utf8
EN

Stack Overflow用户
提问于 2017-09-07 05:39:14
回答 3查看 3K关注 0票数 5

此序列:

from airflow.hooks.mysql_hook import MySqlHook
conn = MySqlHook(mysql_conn_id='conn_id')
engine = conn.get_sqlalchemy_engine()
df.to_sql('test_table', engine, if_exists='append', index=False)

生成以下内容:

UnicodeEncodeError: 'latin-1' codec can't encode character '\ufffd' in position 57: ordinal not in range(256)

这个序列效果很好:

from sqlalchemy import create_engine
engine = create_engine("mysql://{0}:{1}@{2}/capone?charset=utf8".format(user, pwd, host))
df.to_sql('test_table', engine, if_exists='append', index=False)

关键在于显式声明charset。我尝试用{"charset": "utf8"}在气流中做到这一点:

但这并没有修复错误。我重新启动了我的开发环境,因为进行了更改,管理面板告诉我编辑成功了。作为utf8,我如何使用到我的字符集的气流连接?

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2019-06-21 03:33:03

我意识到这是气流中的一个错误,我在这里报告了它:https://issues.apache.org/jira/browse/AIRFLOW-4824

现在,我有一个使用以下代码的解决方法:

def get_uri(hook):
    conn = hook.get_connection(getattr(hook, hook.conn_name_attr))
    login = ''
    if conn.login:
        login = '{conn.login}:{conn.password}@'.format(conn=conn)
    host = conn.host
    if conn.port is not None:
        host += ':{port}'.format(port=conn.port)
    charset = ''
    if conn.extra_dejson.get('charset', False):
        chrs = conn.extra_dejson["charset"]
        if chrs.lower() == 'utf8' or chrs.lower() == 'utf-8':
            charset = '?charset=utf8'
    return '{conn.conn_type}://{login}{host}/{conn.schema}{charset}'.format(
        conn=conn, login=login, host=host, charset=charset)

然后按如下方式使用它:

url = get_uri(sql_hook)
from sqlalchemy import create_engine
engine = create_engine(url)

真正的解决方案是向项目发送一个pull请求,覆盖mysql_hook.py中的get_uri。

票数 2
EN

Stack Overflow用户

发布于 2020-07-28 12:27:07

顺便说一句,我解决了这个问题,工作正常(在airflow.cfg文件中编辑):

sql_alchemy_conn = mysql://user:password@host:port/airflow?charset=utf8
票数 2
EN

Stack Overflow用户

发布于 2019-07-22 15:00:40

from sqlalchemy import create_engine
from airflow.hooks.mysql_hook import MySqlHook

conn = MySqlHook(mysql_conn_id='conn_id')
uri = conn.get_uri()
engine = create_engine(uri+'?charset=utf8')
df.to_sql('test_table', engine, if_exists='append', index=False)

我通过上面的代码修复了这个问题。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/46084744

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档