首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >芹菜没有与瓶中的红葡萄酒连接

芹菜没有与瓶中的红葡萄酒连接
EN

Stack Overflow用户
提问于 2022-06-23 13:43:26
回答 1查看 278关注 0票数 0

我有一个装有芹菜的烧瓶应用程序。

这是配置流程:

app.common.globals.py

代码语言:javascript
运行
复制
from flask import current_app as app
from celery import Celery

celery = Celery()

__init__.py

代码语言:javascript
运行
复制
from app.common.globals import celery
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_BACKEND_URL'])
celery.conf.update(app.config)


def create_app(config_class=Config):
    app: Flask = Flask(__name__)
    app.config.from_object(config_class)
    logging.basicConfig(level=logging.INFO)
    debug = app.config['DEBUG']
    if debug:
        app.logger.setLevel(logging.INFO)

    with app.app_context():
        from app.common.globals import celery

    celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_BACKEND_URL'])
    celery.conf.update(app.config)

    
    return app

config.py

代码语言:javascript
运行
复制
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0'
CELERY_BACKEND_URL = 'redis://127.0.0.1:6379/0'

那么应用程序在wsgi.py中运行的位置

代码语言:javascript
运行
复制
from app import create_app

app = create_app()
debug = app.config['DEBUG']


if __name__ == "__main__":
    app.run(host='127.0.0.1', port=5000, debug=debug, threaded=True)

我已经启动并运行了一个redis服务器,可以轻松地连接到它:

代码语言:javascript
运行
复制
redis-cli -u redis://localhost:6379/0                   
localhost:6379> 

问题是,芹菜显然无法连接到redis服务器,因此它抛出下面的警告,试图连接到默认的amqp://guest:**@127.0.0.1:5672//:,但失败了。

代码语言:javascript
运行
复制
WARNING:kombu.connection:No hostname was supplied. Reverting to default 'localhost'
  File "/User/omar/venv/lib/python3.8/site-packages/kombu/connection.py", line 450, in _reraise_as_library_errors
    raise ConnectionError(str(exc)) from exc
kombu.exceptions.OperationalError: [Errno 61] Connection refused

当我打印celery实例时,它显示它可以读取代理url。

代码语言:javascript
运行
复制
celery = Celery(app, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_BACKEND_URL'])
celery.conf.update(app.config)
print(celery.conf)

<RESULT>

Settings(Settings({'broker_url': 'redis://127.0.0.1:6379/0', 
'result_backend': 'redis://localhost:6379', 'deprecated_settings': 
set()}, {}, {'accept_content': ['json'], 'result_accept_content': 
None, 'enable_utc': True, 'imports': (), 'include': (), 'timezone': 
None, 'beat_max_loop_interval': 0, 'beat_schedule': {}, 
'beat_scheduler': 'celery.beat:PersistentScheduler', 
'beat_schedule_filename': 'celerybeat-schedule', 'beat_sync_every': 
0, 'broker_url': 'redis://127.0.0.1:6379/0', 'broker_read_url': None,
 'broker_write_url': None, 'broker_transport': None, 
'broker_transport_options': {}, 'broker_connection_timeout': 4, 
'broker_connection_retry': True, 'broker_connection_max_retries': 
100, 'broker_failover_strategy': None, 'broker_heartbeat': 120, 
'broker_heartbeat_checkrate': 3.0, 'broker_login_method': None, 
'broker_pool_limit': 10, 'broker_use_ssl': False, 'broker_host': 
None, 'broker_port': None, 'broker_user': None, 'broker_password': 
None, 'broker_vhost': None, 'cache_backend': None, 
'cache_backend_options': {}, 'cassandra_entry_ttl': None, 
'cassandra_keyspace': None, 'cassandra_port': None, 
'cassandra_read_consistency': None, 'cassandra_servers': None, 'cassandra_table': None, 'cassandra_write_consistency': None, 'cassandra_auth_provider': None, 'cassandra_auth_kwargs': None, 'cassandra_options': {}, 's3_access_key_id': None, 's3_secret_access_key': None, 's3_bucket': None, 's3_base_path': None, 's3_endpoint_url': None, 's3_region': None, 'azureblockblob_container_name': 'celery', 'azureblockblob_retry_initial_backoff_sec': 2, 'azureblockblob_retry_increment_base': 2, 'azureblockblob_retry_max_attempts': 3, 'azureblockblob_base_path': '', 'azureblockblob_connection_timeout': 20, 'azureblockblob_read_timeout': 120, 'control_queue_ttl': 300.0, 'control_queue_expires': 10.0, 'control_exchange': 'celery', 'couchbase_backend_settings': None, 'arangodb_backend_settings': None, 'mongodb_backend_settings': None, 'cosmosdbsql_database_name': 'celerydb', 'cosmosdbsql_collection_name': 'celerycol', 'cosmosdbsql_consistency_level': 'Session', 'cosmosdbsql_max_retry_attempts': 9, 'cosmosdbsql_max_retry_wait_time': 30, 'event_queue_expires': 60.0, 'event_queue_ttl': 5.0, 'event_queue_prefix': 'celeryev', 'event_serializer': 'json', 'event_exchange': 'celeryev', 'redis_backend_use_ssl': None, 'redis_db': None, 'redis_host': None, 'redis_max_connections': None, 'redis_username': None, 'redis_password': None, 'redis_port': None, 'redis_socket_timeout': 120.0, 'redis_socket_connect_timeout': None, 'redis_retry_on_timeout': False, 'redis_socket_keepalive': False, 'result_backend': 'redis://localhost:6379', 'result_cache_max': -1, 'result_compression': None, 'result_exchange': 'celeryresults', 'result_exchange_type': 'direct', 'result_expires': datetime.timedelta(days=1), 'result_persistent': None, 'result_extended': False, 'result_serializer': 'json', 'result_backend_transport_options': {}, 'result_chord_retry_interval': 1.0, 'result_chord_join_timeout': 3.0, 'result_backend_max_sleep_between_retries_ms': 10000, 'result_backend_max_retries': inf, 'result_backend_base_sleep_between_retries_ms': 10, 'result_backend_always_retry': False, 'elasticsearch_retry_on_timeout': None, 'elasticsearch_max_retries': None, 'elasticsearch_timeout': None, 'elasticsearch_save_meta_as_text': True, 'security_certificate': None, 'security_cert_store': None, 'security_key': None, 'security_digest': 'sha256', 'database_url': None, 'database_engine_options': None, 'database_short_lived_sessions': False, 'database_table_schemas': None, 'database_table_names': None, 'task_acks_late': False, 'task_acks_on_failure_or_timeout': True, 'task_always_eager': False, 'task_annotations': None, 'task_compression': None, 'task_create_missing_queues': True, 'task_inherit_parent_priority': False, 'task_default_delivery_mode': 2, 'task_default_queue': 'celery', 'task_default_exchange': None, 'task_default_exchange_type': 'direct', 'task_default_routing_key': None, 'task_default_rate_limit': None, 'task_default_priority': None, 'task_eager_propagates': False, 'task_ignore_result': False, 'task_store_eager_result': False, 'task_protocol': 2, 'task_publish_retry': True, 'task_publish_retry_policy': {'max_retries': 3, 'interval_start': 0, 'interval_max': 1, 'interval_step': 0.2}, 'task_queues': None, 'task_queue_max_priority': None, 'task_reject_on_worker_lost': None, 'task_remote_tracebacks': False, 'task_routes': None, 'task_send_sent_event': False, 'task_serializer': 'json', 'task_soft_time_limit': None, 'task_time_limit': None, 'task_store_errors_even_if_ignored': False, 'task_track_started': False, 'worker_agent': None, 'worker_autoscaler': 'celery.worker.autoscale:Autoscaler', 'worker_cancel_long_running_tasks_on_connection_loss': False, 'worker_concurrency': None, 'worker_consumer': 'celery.worker.consumer:Consumer', 'worker_direct': False, 'worker_disable_rate_limits': False, 'worker_deduplicate_successful_tasks': False, 'worker_enable_remote_control': True, 'worker_hijack_root_logger': True, 'worker_log_color': None, 'worker_log_format': '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s', 'worker_lost_wait': 10.0, 'worker_max_memory_per_child': None, 'worker_max_tasks_per_child': None, 'worker_pool': 'prefork', 'worker_pool_putlocks': True, 'worker_pool_restarts': False, 'worker_proc_alive_timeout': 4.0, 'worker_prefetch_multiplier': 4, 'worker_redirect_stdouts': True, 'worker_redirect_stdouts_level': 'WARNING', 'worker_send_task_events': False, 'worker_state_db': None, 'worker_task_log_format': '[%(asctime)s: %(levelname)s/%(processName)s] %(task_name)s[%(task_id)s]: %(message)s', 'worker_timer': None, 'worker_timer_precision': 1.0, 'deprecated_settings': None}))

你知道我做错了什么吗?

EN

Stack Overflow用户

回答已采纳

发布于 2022-06-23 19:15:32

在init.py文件中,您已经在模块级别和create_app()中创建了芹菜对象。您可以使用烧瓶扩展模式将烧瓶应用程序添加到工厂模式中的芹菜中,如:在custom.celery.py文件中

从芹菜进口芹菜进口烧瓶

代码语言:javascript
运行
复制
class FlaskCelery(Celery):
    """Extend celery task to use flask app."""
    def __init__(self, *args, **kwargs):
        """Def initialization fnx."""
        super(FlaskCelery, self).__init__(*args, **kwargs)
        self.patch_task()

        if "app" in kwargs:
            self.init_app(kwargs["app"])

    def patch_task(self):
        """Extend celery task."""
        TaskBase = self.Task
        _celery = self

        class ContextTask(TaskBase):
            """Extend taskbase."""
            abstract = True

            def __call__(self, *args, **kwargs):
                """Class caller definition."""
                if flask.has_app_context():
                    return TaskBase.__call__(self, *args, **kwargs)
                else:
                    with _celery.app.app_context():
                        return TaskBase.__call__(self, *args, **kwargs)

        self.Task = ContextTask

    def init_app(self, app):
        """Initialize celery with app."""
        self.app = app

在extensions.py文件中:

代码语言:javascript
运行
复制
celery = FlaskCelery(broker='redis://127.0.0.1:6379/0', backend='redis://127.0.0.1:6379/0')

然后在init.py文件中:

代码语言:javascript
运行
复制
from extensions import celery


def create_app():
    app: Flask = Flask(__name__)
    app.config.from_object(config_class)
    logging.basicConfig(level=logging.INFO)
    debug = app.config['DEBUG']
    if debug:
        app.logger.setLevel(logging.INFO)

    celery.init_app(app)
    return app
票数 0
EN
查看全部 1 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72731242

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档