Django 项目中 Celery task Debug 调试

celery debug 调试直接用django 不是很方便

可以采用 pdb 的形式进行调试,在celery 中她对 pdb 做了一次的封装,变成了rdb

引入rdb

Python
from <span class="wp_keywordlink_affiliate"><a href="https://www.168seo.cn/tag/celery" title="View all posts in celery" target="_blank">celery</a></span>.contrib import <span class="wp_keywordlink_affiliate"><a href="https://www.168seo.cn/tag/rdb" title="View all posts in rdb" target="_blank">rdb</a></span> #引入包 rdb.set_trace() # 设置断点
1
2
3
4
from celery . contrib import rdb #引入包
 
rdb . set_trace ( ) # 设置断点
 

启动django

Python
Django version 2.1.4, using settings 'taskproj.settings' Development server is running at http://[127.0.0.1]:8000/ Using the Werkzeug debugger (http://werkzeug.pocoo.org/) Quit the server with CONTROL-C. * Debugger is active! * Debugger PIN: 329-253-017 127.0.0.1 - - [03/Jul/2019 16:36:03] "POST /start_task/ HTTP/1.1" 200 -
1
2
3
4
5
6
7
8
Django version 2.1.4 , using settings 'taskproj.settings'
Development server is running at http : / / [ 127.0.0.1 ] : 8000 /
Using the Werkzeug debugger ( http : / / werkzeug . pocoo . org / )
Quit the server with CONTROL - C .
* Debugger is active !
* Debugger PIN : 329 - 253 - 017
127.0.0.1 - - [ 03 / Jul / 2019 16 : 36 : 03 ] "POST /start_task/ HTTP/1.1" 200 -
 

启动worker

Python
. app01.tasks.add . app01.tasks.get_url . app01.tasks.mul . app01.utils.tasks.C . app01.utils.tasks.extract_data . app01.utils.tasks.extract_urls . app01.utils.tasks.get_contents . app01.utils.tasks.parser . app01.utils.tasks.xpath_extract_urls . celery.accumulate . celery.backend_cleanup . celery.chain . celery.chord . celery.chord_unlock . celery.chunks . celery.group . celery.map . celery.starmap . taskproj.celery.debug_task
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
   . app01 . tasks . add
   . app01 . tasks . get _url
   . app01 . tasks . mul
   . app01 . utils . tasks . C
   . app01 . utils . tasks . extract _data
   . app01 . utils . tasks . extract _urls
   . app01 . utils . tasks . get _contents
   . app01 . utils . tasks . parser
   . app01 . utils . tasks . xpath_extract _urls
   . celery . accumulate
   . celery . backend _cleanup
   . celery . chain
   . celery . chord
   . celery . chord _unlock
   . celery . chunks
   . celery . group
   . celery . map
   . celery . starmap
   . taskproj . celery . debug _task
 
 

发送生产者

Python
<span class="wp_keywordlink"><a href="http://www.168seo.cn/python" title="python">python</a></span> post_data.py
1
2
python post_data . py
 

观察worker

Python
-de13-4aca-8cb9-22f32ea5d00f', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '8461613f-18ab-4756-8fff-e56c164613b1', 'parent_ id': '8461613f-18ab-4756-8fff-e56c164613b1', 'argsrepr': "(['https://www.csai.cn/gonglue/', 'https://www.csai.cn/pingce/', 'https://jin.baidu.com/article', 'https://jin.baidu.com/article/car', 'https://jin.baidu.com/article/list_120 000', 'https://jin.baidu.com/article/list_160000'],)", 'kwargsrepr': '{}', 'origin': 'gen15574@songhao-pc', 'reply_to': 'd9fda1c9-044d-38e5-b640-d7f418f525b3', 'correlation_id': '7515aa8e-de13-4aca-8cb9-22f32ea5d00f', 'delivery_info ': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': False}}, '[[["https://www.csai.cn/gonglue/", "https://www.csai.cn/pingce/", "https://jin.baidu.com/article",... kwargs:{}) [2019-07-04 09:18:46,773: DEBUG/MainProcess] Task accepted: app01.utils.tasks.xpath_extract_urls[7515aa8e-de13-4aca-8cb9-22f32ea5d00f] pid:15575 [2019-07-04 09:18:46,776: WARNING/ForkPoolWorker-2] Remote Debugger:6901: Ready to connect: telnet 127.0.0.1 6901 Type `exit` in session to continue. Remote Debugger:6901: Waiting for client... # 监听的是 6901
1
2
3
4
5
6
7
8
9
10
11
- de13 - 4aca - 8cb9 - 22f32ea5d00f ', ' shadow ': None, ' eta ': None, ' expires ': None, ' group ': None, ' retries ': 0, ' timelimit ': [None, None], ' root _id ': ' 8461613f - 18ab - 4756 - 8fff - e56c164613b1 ', ' parent_
id ': ' 8461613f - 18ab - 4756 - 8fff - e56c164613b1 ', ' argsrepr ': "([' https : / / www . csai . cn / gonglue / ', ' https : / / www . csai . cn / pingce / ', ' https : / / jin . baidu . com / article ', ' https : / / jin . baidu . com / article / car ', ' https : / / jin . baidu . com / article / list _120
000 ', ' https : / / jin . baidu . com / article / list _160000 '],)", ' kwargsrepr ': ' { } ', ' origin ': ' gen15574 @ songhao - pc ', ' reply _to ': ' d9fda1c9 - 044d - 38e5 - b640 - d7f418f525b3 ', ' correlation _id ': ' 7515aa8e - de13 - 4aca - 8cb9 - 22f32ea5d00f ', ' delivery _info
': {' exchange ': ' ', ' routing _key ': ' celery ', ' priority ': 0, ' redelivered ': False}}, ' [ [ [ "https://www.csai.cn/gonglue/" , "https://www.csai.cn/pingce/" , "https://jin.baidu.com/article" , . . . kwargs : { } )
[ 2019 - 07 - 04 09 : 18 : 46 , 773 : DEBUG / MainProcess ] Task accepted : app01 . utils . tasks . xpath_extract_urls [ 7515aa8e - de13 - 4aca - 8cb9 - 22f32ea5d00f ] pid : 15575
[ 2019 - 07 - 04 09 : 18 : 46 , 776 : WARNING / ForkPoolWorker - 2 ] Remote Debugger : 6901 : Ready to connect : telnet 127.0.0.1 6901
 
Type ` exit ` in session to continue .
 
Remote Debugger : 6901 : Waiting for client . . . # 监听的是 6901
 

打开监听端口 进行调试

rdb 调试操作



  • zeropython 微信公众号 5868037 QQ号 5868037@qq.com QQ邮箱
<think>嗯,我现在需要在Django项目启动时同时触发Kafka消费者的异步执行。这听起来有点挑战,因为我对Django和Kafka都不太熟悉。让我先理清楚问题。 首先,Django项目启动时,通常是指运行开发服务器(如python manage.py runserver)或者生产环境的WSGI服务器(比如Gunicorn或uWSGI)。这个时候,我需要启动Kafka消费者,让它开始监听消息,并且不能阻塞主线程,否则可能会影响Django的正常运行。所以,我需要找到在Django启动时执行某些代码的方法,同时处理好异步执行的问题。 我记得Django有一些应用启动时的钩子,比如AppConfig的ready()方法。AppConfig是Django应用的一个配置类,每个应用都可以有一个,而ready()方法会在Django完全加载后执行。所以,我应该在某个AppConfig的ready()方法里启动消费者。但需要确定这个ready()方法是否会在每次Django启动时被调用,比如在开发服务器运行时可能会有多次加载,但生产环境可能只加载一次。需要确认这一点,否则可能导致消费者被多次启动。 接下来,如何启动Kafka消费者呢?消费者应该是一个持续运行的进程,监听消息。通常,Kafka消费者会在一个循环中不断拉取消息。如果直接在ready()方法里启动这个循环,那么可能会阻塞Django的主线程,导致服务无法正常处理请求。因此,必须让消费者在一个单独的线程或者进程中运行,或者使用异步框架来处理。 所以,可能的解决方案是:在ready()方法中创建一个新的线程,启动消费者的循环。这样主线程(Django)可以继续处理请求,而消费者在后台运行。但线程安全需要注意,比如如何正确启动和停止线程,避免资源竞争等问题。 另外,需要考虑在生产环境和开发环境中的不同表现。例如,使用Gunicorn启动时,可能会有多个工作进程,每个进程都启动一个消费者线程,这可能会导致重复消费的问题。因此,可能需要确保只有一个消费者实例在运行,或者使用消费者组来管理分区。 具体步骤可能如下: 1. 在Django的某个应用下的apps.py中创建自定义的AppConfig,并重写ready()方法。 2. 在ready()方法中,检查是否已经启动过消费者线程,避免重复启动。 3. 启动一个守护线程,运行消费者的监听循环。 4. 消费者代码需要正确关闭资源,比如在Django关闭时停止消费者,可能需要注册信号处理器。 5. 处理可能出现的异常,确保线程的健壮性。 但如何确保线程只启动一次呢?因为如果在使用自动重新加载的开发服务器中,Django可能会多次加载应用,导致多次调用ready()方法。这时需要一种机制来确保线程只启动一次,比如使用一个标志位或者检查线程是否已经存在。 另外,线程中的异常处理也很重要。如果消费者线程因为某些错误退出,可能需要重新启动它,或者记录错误信息。 还有一个问题是,当Django服务器关闭时,如何优雅地停止消费者线程。可能需要监听Django的关闭信号,发送停止信号给消费者线程,让它退出循环,并关闭Kafka客户端的连接。 总结一下,可能的实现步骤: - 创建自定义AppConfig,在ready()中启动消费者线程。 - 使用线程来运行消费者的消费循环,避免阻塞主线程。 - 确保线程只启动一次,尤其是在开发环境中。 - 处理服务器的关闭信号,正确关闭消费者。 - 考虑多进程环境下的问题,比如使用Gunicorn的prefork模型可能会导致多个进程启动消费者,可能需要其他机制来协调。 现在,我需要验证这些思路是否正确。例如,在AppConfig的ready()方法中启动线程是否可行?是的,但需要注意,Django的开发服务器可能会运行两次ready()方法,因为自动重载机制。因此,可能需要添加一个检查条件,例如仅在主进程或特定条件下启动线程。 另外,关于守护线程与非守护线程的区别。守护线程会在主线程退出时自动终止,而非守护线程会阻止程序退出。所以,如果消费者线程设置为守护线程,当Django主线程结束时,消费者线程也会自动终止。这可能适合生产环境中的正常关闭,但需要确保在关闭前消费者能完成当前任务。 关于Kafka消费者的实现,可能需要使用confluent-kafka或者kafka-python库。例如,使用kafka-python的Consumer类,然后在循环中poll消息,处理它们。在线程中运行这个循环,并且在收到关闭信号时跳出循环,关闭consumer。 可能的代码结构示例: 在apps.py中: ```python from django.apps import AppConfig import threading from .kafka_consumer import start_consumer class MyAppConfig(AppConfig): default_auto_field = 'django.db.models.BigAutoField' name = 'myapp' def ready(self): # 确保只在主线程中启动一次 if not threading.current_thread().daemon: consumer_thread = threading.Thread(target=start_consumer, daemon=True) consumer_thread.start() ``` 这里的start_consumer函数可能是定义在kafka_consumer模块中的一个函数,包含消费者的循环。 但这样可能还是无法避免在开发服务器中多次启动的问题,因为当自动重载时,主线程可能再次执行ready()。所以,可能需要使用一个标志位,例如模块级的变量,来记录是否已经启动了线程。 例如: ```python _consumer_started = False class MyAppConfig(AppConfig): # ... def ready(self): global _consumer_started if not _consumer_started and not threading.current_thread().daemon: _consumer_started = True consumer_thread = threading.Thread(target=start_consumer, daemon=True) consumer_thread.start() ``` 不过,全局变量可能在某些情况下不可靠,比如在某些服务器配置中。此外,如果使用Gunicorn等预fork服务器,每个工作进程都会启动自己的线程,可能导致多个消费者同时运行,这可能会有问题。这时候可能需要其他机制,比如单独运行消费者进程,而不是在Django应用中启动。 但问题要求的是在Django项目启动时触发,所以可能只能在同一个进程中处理,或者需要其他方式协调。 另一个注意事项是,生产环境中的Django通常由WSGI服务器运行,而这些服务器可能以多进程方式工作。例如,Gunicorn会启动多个工作进程,每个进程都会运行AppConfig的ready()方法,导致多个消费者进程。这种情况下,每个消费者都会加入同一个消费者组,Kafka会自动分配分区,这样可能不会有问题,但需要确保消费者组设置正确。不过,这可能不是最优的,因为每个进程的消费者可能处理不同的分区消息,但需要根据实际情况评估。 或者,可以仅在主进程(例如Gunicorn的主控制器进程,而不是工作进程)中启动消费者,但这需要区分进程类型,可能比较复杂。 这种情况下,可能更好的做法是将消费者作为一个独立的管理命令运行,例如使用python manage.py consume_kafka,并结合supervisor等进程管理工具来管理。但用户的问题明确要求在Django启动时触发,所以可能需要整合到Django的启动流程中。 回到原来的思路,可能的实现步骤: 1. 在AppConfig的ready()方法中启动消费者线程,但确保线程只启动一次,例如通过模块级变量或者检查线程是否存活。 2. 消费者线程作为守护线程运行,避免阻止Django进程退出。 3. 处理信号,比如在接收到SIGTERM或SIGINT时,优雅地关闭消费者。 例如,可以注册信号处理函数,当Django收到关闭信号时,通知消费者线程停止。 但Python中的信号处理需要主线程来处理,而子线程可能无法捕获信号。所以,可能的做法是在主线程中设置一个标志,消费者线程定期检查该标志,以决定是否退出循环。 或者,在消费者循环中处理异常,比如KeyboardInterrupt,但这可能不够可靠。 另一个方法是使用Event对象来通知线程停止: ```python import threading from django.apps import AppConfig from django import setup from signal import signal, SIGINT, SIGTERM stop_event = threading.Event() def start_consumer(): consumer = KafkaConsumer(...) while not stop_event.is_set(): # poll消息并处理 records = consumer.poll(timeout_ms=1000) # 处理records consumer.close() class MyAppConfig(AppConfig): def ready(self): if ...: # 条件避免重复启动 consumer_thread = threading.Thread(target=start_consumer, daemon=True) consumer_thread.start() # 注册信号处理 def handle_signal(signum, frame): stop_event.set() consumer_thread.join() signal(SIGINT, handle_signal) signal(SIGTERM, handle_signal) ``` 但需要注意,Django可能已经注册了自己的信号处理函数,覆盖它们可能会导致问题。此外,在WSGI服务器中,信号的处理可能由服务器自己管理,所以这种方法可能不可靠。 因此,可能更可靠的方式是使用Django的请求结束信号或者连接关闭时的回调,但这些可能不适用于所有情况。 综上所述,可能的解决方案是在AppConfig的ready()方法中启动一个守护线程来运行Kafka消费者,并在该线程中循环处理消息,同时设置一个停止事件,当Django进程结束时,通过信号或其他方式触发该事件,使消费者线程退出。 但实际应用中,可能还需要考虑更多的边界情况,例如: - 消费者线程的异常处理,防止线程崩溃后无法重启。 - 确保Kafka消费者的配置正确,比如消费者组ID、自动提交等。 - 在开发环境中,自动重载可能导致线程多次启动,需要使用条件检查来避免。 - 在生产环境中多进程启动时的处理,可能需要调整Kafka消费者的配置,使其适合多进程消费。 最终,用户可能需要根据具体需求和环境调整实现方式。例如,在开发环境中直接使用线程启动消费者,而在生产环境中使用独立的管理命令结合进程管理工具来运行消费者,可能更为可靠。但根据问题描述,用户希望的是在Django启动时自动触发,因此整合到AppConfig中的方法更为合适。</think>在Django项目启动时触发Kafka消费者异步执行,可以通过以下步骤实现: --- ### 1. **创建自定义AppConfig** 在Django应用的`apps.py`中定义自定义`AppConfig`,并重写`ready()`方法。此方法会在Django启动时执行。 ```python # myapp/apps.py from django.apps import AppConfig import threading from django.conf import settings from kafka import KafkaConsumer import logging logger = logging.getLogger(__name__) class MyAppConfig(AppConfig): default_auto_field = 'django.db.models.BigAutoField' name = 'myapp' def ready(self): # 仅在主线程中运行(避免开发服务器重载时重复启动) if not settings.DEBUG or threading.current_thread().name == 'MainThread': self.start_kafka_consumer() def start_kafka_consumer(self): # 避免重复启动 if not hasattr(self, '_consumer_thread'): self._consumer_thread = threading.Thread( target=self.run_kafka_consumer, daemon=True # 设置为守护线程,Django退出时自动终止 ) self._consumer_thread.start() logger.info("Kafka consumer thread started.") def run_kafka_consumer(self): consumer = KafkaConsumer( 'your_topic', bootstrap_servers=['localhost:9092'], group_id='your_consumer_group', auto_offset_reset='earliest' ) try: for message in consumer: # 处理消息(需实现业务逻辑) logger.info(f"Received message: {message.value}") except Exception as e: logger.error(f"Kafka consumer error: {e}") finally: consumer.close() ``` --- ### 2. **注册自定义AppConfig** 在应用的`__init__.py`中指定默认的`AppConfig`: ```python # myapp/__init__.py default_app_config = 'myapp.apps.MyAppConfig' ``` --- ### 3. **注意事项** - **避免开发服务器重复启动**:Django开发服务器(`runserver`)在代码修改时会自动重启,导致`ready()`被多次调用。通过检查`MainThread`或`DEBUG`模式可避免重复启动线程。 - **生产环境多进程问题**:使用Gunicorn等WSGI服务器时,每个工作进程都会启动消费者线程,可能导致重复消费。需确保Kafka消费者组(`group_id`)配置正确,或改用独立进程运行消费者。 - **优雅退出**:通过守护线程(`daemon=True`)确保Django退出时线程自动终止。若需资源清理,可监听Django关闭信号(如`request_finished`)。 --- ### 4. **优化方案:信号处理** 注册信号处理器,确保Django退出时消费者正确关闭: ```python # myapp/apps.py from signal import signal, SIGINT, SIGTERM import sys class MyAppConfig(AppConfig): # ... 其他代码同上 ... def run_kafka_consumer(self): self._shutdown_flag = False # 注册信号处理器 signal(SIGINT, self.handle_signal) signal(SIGTERM, self.handle_signal) consumer = KafkaConsumer(...) while not self._shutdown_flag: # 使用poll而非for循环,以便检查退出标志 msg = consumer.poll(timeout_ms=1000) if msg: # 处理消息 pass def handle_signal(self, signum, frame): self._shutdown_flag = True sys.exit(0) ``` --- ### 5. **替代方案:使用Celery** 若需更高可靠性,可将消费者作为异步任务交给Celery: ```python # tasks.py from celery import shared_task @shared_task def consume_kafka_messages(): consumer = KafkaConsumer(...) for message in consumer: # 处理消息 ``` 然后在AppConfig中触发Celery任务: ```python # apps.py def ready(self): from .tasks import consume_kafka_messages consume_kafka_messages.delay() ``` --- ### 总结 - **简单场景**:使用自定义`AppConfig` + 守护线程。 - **生产环境**:建议通过独立进程(如`manage.py`自定义命令)运行消费者,结合Supervisor或Docker管理。 - **高可靠性需求**:集成Celery或Kafka的官方多线程/进程消费API。
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值