首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >airflow插件未正确拾取

airflow插件未正确拾取
EN

Stack Overflow用户
提问于 2018-04-18 01:11:07
回答 2查看 6.7K关注 0票数 5

我们使用的是Apache 1.9.0。我写了一个雪花钩子插件。我已经将钩子放在$AIRFLOW_HOME/plugins目录中。

$AIRFLOW_HOME
  +--plugins
    +--snowflake_hook2.py

snowflake_hook2.py

# This is the base class for a plugin
from airflow.plugins_manager import AirflowPlugin

# This is necessary to expose the plugin in the Web interface
from flask import Blueprint
from flask_admin import BaseView, expose
from flask_admin.base import MenuLink

# This is the base hook for connecting to a database
from airflow.hooks.dbapi_hook import DbApiHook

# This is the snowflake provided Connector
import snowflake.connector

# This is the default python logging package
import logging

class SnowflakeHook2(DbApiHook):
    """
    Airflow Hook to communicate with Snowflake
    This is implemented as a Plugin
    """
    def __init__(self, connname_in='snowflake_default', db_in='default', wh_in='default', schema_in='default'):
        logging.info('# Connecting to {0}'.format(connname_in))
        self.conn_name_attr = 'snowflake_conn_id'
        self.connname = connname_in
        self.superconn = super().get_connection(self.connname) #gets the values from Airflow

        {SNIP - Connection stuff that works}
        self.cur = self.conn.cursor()

    def query(self,q,params=None):
        """From jmoney's db_wrapper allows return of a full list of rows(tuples)"""
        if params == None: #no Params, so no insertion
            self.cur.execute(q)
        else: #make the parameter substitution
            self.cur.execute(q,params)
        self.results = self.cur.fetchall()
        self.rowcount = self.cur.rowcount
        self.columnnames = [colspec[0] for colspec in self.cur.description]
        return self.results
    {SNIP - Other class functions}

class SnowflakePluginClass(AirflowPlugin):
    name = "SnowflakePluginModule"
    hooks = [SnowflakeHook2]
    operators = []

因此,我继续在Airflow plugin_manager中添加了一些打印语句,以尝试更好地处理正在发生的事情。在重启were服务器并运行airflow list_dags之后,这些行显示"new module name“(新模块名称)(没有错误

SnowflakePluginModule [<class '__home__ubuntu__airflow__plugins_snowflake_hook2.SnowflakeHook2'>]
hook_module -  airflow.hooks.snowflakepluginmodule
INTEGRATING airflow.hooks.snowflakepluginmodule
snowflakepluginmodule <module 'airflow.hooks.snowflakepluginmodule'>

因为这与文档中所说的一致,所以我应该可以在我的DAG中使用以下内容:

from airflow import DAG
from airflow.hooks.snowflakepluginmodule import SnowflakeHook2 
from airflow.operators.python_operator import PythonOperator

但是web抛出了这个错误

Broken DAG: [/home/ubuntu/airflow/dags/test_sf2.py] No module named 'airflow.hooks.snowflakepluginmodule'

所以问题是,我做错了什么?还是我发现了一个bug?

EN

回答 2

Stack Overflow用户

发布于 2018-06-23 08:09:40

您需要导入如下内容:

from airflow import DAG
from airflow.hooks import SnowflakeHook2 
from airflow.operators.python_operator import PythonOperator

from airflow import DAG
from airflow.hooks.SnowflakePluginModule import SnowflakeHook2 
from airflow.operators.python_operator import PythonOperator
票数 4
EN

Stack Overflow用户

发布于 2018-04-20 20:49:49

我不认为airflow会自动遍历插件目录中的文件夹并运行它下面的所有内容。我成功设置它的方法是在包含每个插件类的插件目录下有一个__init__.py。看看Github中的Astronomer插件,它为如何设置插件提供了一些非常好的示例。

特别是看看他们是如何设置mysql插件的

https://github.com/airflow-plugins/mysql_plugin

另外,有人在airflow的一个较新版本中加入了一个雪花钩子,你可能想要利用它:

https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/snowflake_hook.py

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/49883840

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档