Celery介绍和使用
一.Celery介绍:
一个简单、灵活且可靠、处理大量消息的分布式系统,可以在一台或者多台机器上运行。 Celery是一个功能完备即插即用的任务队列
单个 Celery 进程每分钟可处理数以百万计的任务。通过消息进行通信,使用消息队列(broker)在客户端和消费者之间进行协调。
安装Celery:
$ pip install -U Celery
Celery官方文档:https://docs.celeryq.dev/en/latest/index.html
1. 生产者消费者设计模式
最常用的解耦方式之一,寻找中间人(broker)搭桥,保证两个业务没有直接关联。
我们称这一解耦方式为:生产者消费者设计模式
总结:
- 生产者生成消息,缓存到消息队列中,消费者读取消息队列中的消息并执行。
- 由生产者生成发送短信消息,缓存到消息队列中,消费者读取消息队列中的发送短信消息并执行。
2.中间人broker
示例:此处演示Redis数据库作为中间人broker
Celery需要一种解决消息的发送和接受的方式,我们把这种用来存储消息的的中间装置叫做message broker, 也可叫做消息中间人。
作为中间人,我们有几种方案可选择:
2.1.RabbitMQ
RabbitMQ是一个功能完备,稳定的并且易于安装的broker. 它是生产环境中最优的选择。
使用RabbitMQ的细节参照以下链接:http://docs.celeryproject.org/en/latest/getting-started/brokers/rabbitmq.html#broker-rabbitmq
如果使用的是Ubuntu或者Debian发行版的Linux,可以直接通过命令安装RabbitMQ:
sudo apt-get install rabbitmq-server
安装完毕之后,RabbitMQ-server服务器就已经在后台运行。
如果用的并不是Ubuntu或Debian, 可以在以下网址:
http://www.rabbitmq.com/download.html
去查找自己所需要的版本软件。
2.2.Redis
Redis也是一款功能完备的broker可选项,但是其更可能因意外中断或者电源故障导致数据丢失的情况。
关于是由那个Redis作为Broker,可访下面网址: http://docs.celeryproject.org/en/latest/getting-started/brokers/redis.html#broker-redis
2.3. celery框架伪代码
# 队列,中间人
class Broker(object):
# 任务队列
broker_list = []
# 消费者
class Worker(object):
# 任务执行者
def run(self, broker, func):
if func in broker.broker_list:
func()
else:
return 'error'
# Celery 将这3者 串联起来了
class Celery(object):
def __init__(self):
self.broker = Broker()
self.worker = Worker()
def add(self, func):
self.broker.broker_list.append(func)
def work(self, func):
self.worker.run(self.broker,func)
# 任务(函数),生产者
def send_sms_code():
print('send_sms_code')
# 1.创建celery实例
app=Celery()
# 2. 添加任务
app.add(send_sms_code)
# 3.执行任务
app.work(send_sms_code)
二. 配合Flask框架的基本使用
1.首先需要确定接口,即需要实现的功能接口并确定其接口名称
2.在main.py文件中定义接口,实现路由的使用
@app.route("/test", methods=['post'])
def test():
func()
return "hello world"
3.在task.py文件中定义需要调用的函数func
celery配置目录结构
4.使用
1.celery_worker.py
import os
import sys
from celery import Celery
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # 设置 BASE_DIR
sys.path.append(BASE_DIR)
from configure.server_config import logger
from configure.configure import config
HOST = config['redis']['host']
PORT = config['redis']['port']
DB = config['celery']['db']
BROKER = f"redis://{HOST}:{PORT}/{DB}"
BACKEND = f"redis://{HOST}:{PORT}/{DB}"
# 配置 Celery
celery = Celery('celery_tasks',
backend=BACKEND,
broker=BROKER)
celery.conf.broker_connection_retry_on_startup = True
# celery.config_from_object('celery_tasks.celery_config')
# 其他配置
celery.conf.update(
accept_content=['json'],
task_serializer='json',
result_serializer='json',
task_time_limit=300, # 单个任务最大运行时间(秒)
max_tasks_per_child=100, # 每个 worker 重启前的任务数
)
# 自动发现任务 celery_tasks.aaa -> celery_tasks 目录下的问价夹
celery.autodiscover_tasks(['celery_tasks.aaa', 'celery_tasks.bbb'])
logger.info(f'Celery started Broker -> [{BROKER}]....:')
2.在task.py中定义任务函数
@celery.task
def demo_api_task():
print("haha")
return "hahaha"
3.使用异步任务函数
@app.route('/celery_task_test', methods=['GET'])
def celery_task_test():
from celery_tasks.aaa.tasks import demo_api_task
res = demo_api_task.delay().get()
logger.info(f'res -> {res}')
return res
到此结束,celery接口实现完成
接下来启动celery任务
4.在虚拟机上进入虚拟环境,work on env_name
进入虚拟机上存放的main.py文件下启动gunicorn
guniron -w 6 --log-level=debug main:app
进入task.py文件目录下启动work
celery -A task.celery work -l info
windows 启动命令
windows环境必须安装:pip install eventlet
celery -A celery_worker worker -l info -P eventlet
至此,celery基本调用方法使用完成