首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >码头-与气流组合- MS SQL Server (连接失败)

码头-与气流组合- MS SQL Server (连接失败)
EN

Stack Overflow用户
提问于 2019-05-29 17:29:28
回答 1查看 1.9K关注 0票数 0

我无法连接在Server内部的气流使用对接-合成。我想直接从SQL Server获取数据到云存储,然后将数据发送到Big Query。

如何解决这个问题?

代码语言:javascript
复制
import json
from datetime import timedelta, datetime

from airflow import DAG
from airflow.models import Variable
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_check_operator import BigQueryCheckOperator
from airflow.contrib.operators.file_to_gcs import FileToGoogleCloudStorageOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.contrib.operators.mysql_to_gcs import MySqlToGoogleCloudStorageOperator


default_args = {
    'owner': 'Test Data',
    'depends_on_past': True,    
    'start_date': datetime(2019, 5, 29),
    'end_date': datetime(2019, 5, 30),
    'email': ['email@clientx.com.br'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Set Schedule: Run pipeline once a day. 
# Use cron to define exact time. Eg. 8:15am would be "15 08 * * *"
schedule_interval = "* * * * *"

# Define DAG: Set ID and assign default args and schedule interval
dag = DAG(
    'bigquery_github_trends', 
    default_args=default_args, 
    schedule_interval=schedule_interval
    )


extract = MySqlToGoogleCloudStorageOperator(
    task_id='chama_extract',
    mysql_conn_id='mysql_hml',
    google_cloud_storage_conn_id='my_gcp_conn',
    sql="""SELECT * FROM test""",
    bucket='my_bucket',
    filename='test/test{}.json',
    schema_filename='schemas/test.json',
    dag=dag)

load = GoogleCloudStorageToBigQueryOperator(
            task_id='chama_load',
            bigquery_conn_id='my_gcp_conn',
            google_cloud_storage_conn_id='my_gcp_conn',
            bucket='my_bucket',
            destination_project_dataset_table="tst.teste123",
            source_objects=['test/test0.json'],
            schema_object='schemas/test.json',
            source_format='NEWLINE_DELIMITED_JSON',
            create_disposition='CREATE_IF_NEEDED',
            write_disposition='WRITE_TRUNCATE',
            dag=dag)


# Setting up Dependencies
load.set_upstream(extract)

Docker-compose.yml

代码语言:javascript
复制
version: '3'
services:
  postgres:
    image: postgres:9.6
    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow
    ports:
      - "5432:5432"

  webserver:
    image: puckel/docker-airflow:1.10.1
    build:
      context: https://github.com/puckel/docker-airflow.git#1.10.1
      dockerfile: Dockerfile
      args:
        AIRFLOW_DEPS: gcp_api,s3        
    restart: always
    depends_on:
      - postgres
    environment:
      - LOAD_EX=n
      - EXECUTOR=Local
      - FERNET_KEY=jsDPRErfv8Z_eVTnGfF8ywd19j4pyqE3NpdUBA_oRTo=
    volumes:
      - ./examples/intro-example/dags:/usr/local/airflow/dags
      # Uncomment to include custom plugins
      # - ./plugins:/usr/local/airflow/plugins
    ports:
      - "8080:8080"
    command: webserver
    healthcheck:
      test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
      interval: 30s
      timeout: 30s
      retries: 3

docker-compose-gcloud.yml

代码语言:javascript
复制
version: '3'
services:
  postgres:
    image: postgres:9.6
    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow
    ports:
      - "5432:5432"

  webserver:
    image: puckel/docker-airflow:1.10.1
    build:
      context: https://github.com/puckel/docker-airflow.git#1.10.1
      dockerfile: Dockerfile
      args:
        AIRFLOW_DEPS: gcp_api,s3        
    restart: always
    depends_on:
      - postgres
    environment:
      - LOAD_EX=n
      - EXECUTOR=Local
      - FERNET_KEY=jsDPRErfv8Z_eVTnGfF8ywd19j4pyqE3NpdUBA_oRTo=
    volumes:
      - ./examples/gcloud-example/dags:/usr/local/airflow/dags
      # Uncomment to include custom plugins
      # - ./plugins:/usr/local/airflow/plugins
    ports:
      - "8080:8080"
    command: webserver
    healthcheck:
      test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
      interval: 30s
      timeout: 30s
      retries: 3

并在docker中执行命令:

码头-合成-f码头-合成-gCloud.yml向上-中止-在集装箱-出口

气流中的错误信息:

2019年-05-29 07:00:37,938 {{logging_mixin.py:95} INFO - 2019-05-29 07:00:37,937 {{base_hook.py:83}} INFO -使用连接到: 10.0.0.1 2019-05-29 07:00:58,974 {{models.py:1760}} ERROR -(2003年,‘无法在10.0.0.1上连接到MySQL服务器(111个“拒绝连接”) 回溯(最近一次调用): 文件"/usr/local/lib/python3.6/site-packages/airflow/models.py",第1659行,在_run_raw_task中 结果= task_copy.execute(context=context) 文件"/usr/local/lib/python3.6/site-packages/airflow/contrib/operators/mysql_to_gcs.py",第105行,在执行中 游标= self._query_mysql() 文件"/usr/local/lib/python3.6/site-packages/airflow/contrib/operators/mysql_to_gcs.py",第127行,在_query_mysql中 conn = mysql.get_conn() 文件"/usr/local/lib/python3.6/site-packages/airflow/hooks/mysql_hook.py",第103行,在get_conn中 conn = MySQLdb.connect(**conn_config) 文件"/usr/local/lib/python3.6/site-packages/MySQLdb/init.py",第84行,在连接中 返回连接(*args,**kwargs) 文件"/usr/local/lib/python3.6/site-packages/MySQLdb/connections.py",第164行,init 超级(连接,self).init(*args,**kwargs2 2) MySQLdb._exceptions.OperationalError:(2003年,“无法在10.0.0.1 (111个”拒绝连接“)上连接到MySQL服务器) 2019年-05-29 07:00:58,988 {{models.py:1789} INFO -所有重试失败;将任务标记为失败 2019年-05-29 07:00:58,992 {{logging_mixin.py:95} INFO - 2019-05-29 07:00:58,991 {{configuration.py:255}}警告-部分/键smtp/smtp_user在配置中找不到 2019年-05-29 07:00:58,998 {{models.py:1796}}错误- Errno 99不能分配请求的地址 回溯(最近一次调用): 文件"/usr/local/lib/python3.6/site-packages/airflow/models.py",第1659行,在_run_raw_task中 结果= task_copy.execute(context=context) 文件"/usr/local/lib/python3.6/site-packages/airflow/contrib/operators/mysql_to_gcs.py",第105行,在执行中 游标= self._query_mysql() 文件"/usr/local/lib/python3.6/site-packages/airflow/contrib/operators/mysql_to_gcs.py",第127行,在_query_mysql中 conn = mysql.get_conn() 文件"/usr/local/lib/python3.6/site-packages/airflow/hooks/mysql_hook.py",第103行,在get_conn中 conn = MySQLdb.connect(**conn_config) 文件"/usr/local/lib/python3.6/site-packages/MySQLdb/init.py",第84行,在连接中 返回连接(*args,**kwargs) 文件"/usr/local/lib/python3.6/site-packages/MySQLdb/connections.py",第164行,init 超级(连接,self).init(*args,**kwargs2 2) MySQLdb._exceptions.OperationalError:(2003年,“无法在10.0.0.1 (111个”拒绝连接“)上连接到MySQL服务器)

EN

Stack Overflow用户

发布于 2019-05-30 07:27:50

从错误来看,对我来说关键的部分似乎是"get_conn“部分。这表明当气流试图建立与数据库的连接时,它会失败。这意味着您的连接没有被指定(它看起来可能是),或者它的某些部分是不正确的。

您应该检查密码、服务器地址和端口是否正确。这些应该在您的airflow.cfg中,作为环境变量,或者设置在webserver (管理面板)中。

票数 0
EN
查看全部 1 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56365779

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档