【Flask+Celery】- 分布式任务工作使用流程

本文详细介绍Celery任务队列的原理与应用,包括生产者消费者模式、broker选择(如RabbitMQ、Redis),以及如何在Flask框架中集成Celery进行异步任务处理。

摘要生成于 C知道 ,由 DeepSeek-R1 满血版支持, 前往体验 >

Celery介绍和使用

一.Celery介绍:

在这里插入图片描述
一个简单、灵活且可靠、处理大量消息的分布式系统,可以在一台或者多台机器上运行。 Celery是一个功能完备即插即用的任务队列
单个 Celery 进程每分钟可处理数以百万计的任务。通过消息进行通信,使用消息队列(broker)在客户端和消费者之间进行协调。
安装Celery:

$ pip install -U Celery

Celery官方文档:https://docs.celeryq.dev/en/latest/index.html

1. 生产者消费者设计模式

最常用的解耦方式之一,寻找中间人(broker)搭桥,保证两个业务没有直接关联。
我们称这一解耦方式为:生产者消费者设计模式
在这里插入图片描述
总结:

  • 生产者生成消息,缓存到消息队列中,消费者读取消息队列中的消息并执行。
  • 由生产者生成发送短信消息,缓存到消息队列中,消费者读取消息队列中的发送短信消息并执行。

2.中间人broker

示例:此处演示Redis数据库作为中间人broker
Celery需要一种解决消息的发送和接受的方式,我们把这种用来存储消息的的中间装置叫做message broker, 也可叫做消息中间人。
作为中间人,我们有几种方案可选择:

2.1.RabbitMQ

RabbitMQ是一个功能完备,稳定的并且易于安装的broker. 它是生产环境中最优的选择。

使用RabbitMQ的细节参照以下链接:http://docs.celeryproject.org/en/latest/getting-started/brokers/rabbitmq.html#broker-rabbitmq

如果使用的是Ubuntu或者Debian发行版的Linux,可以直接通过命令安装RabbitMQ:

sudo apt-get install rabbitmq-server

安装完毕之后,RabbitMQ-server服务器就已经在后台运行。

如果用的并不是Ubuntu或Debian, 可以在以下网址:
http://www.rabbitmq.com/download.html
去查找自己所需要的版本软件。

2.2.Redis

Redis也是一款功能完备的broker可选项,但是其更可能因意外中断或者电源故障导致数据丢失的情况。
关于是由那个Redis作为Broker,可访下面网址: http://docs.celeryproject.org/en/latest/getting-started/brokers/redis.html#broker-redis

2.3. celery框架伪代码

# 队列,中间人
class Broker(object):
    # 任务队列
    broker_list = []

# 消费者
class Worker(object):
    # 任务执行者

    def run(self, broker, func):
        if func in broker.broker_list:
            func()
        else:
            return 'error'

# Celery 将这3者 串联起来了
class Celery(object):
    def __init__(self):
        self.broker = Broker()
        self.worker = Worker()

    def add(self, func):
        self.broker.broker_list.append(func)

    def work(self, func):
        self.worker.run(self.broker,func)


# 任务(函数),生产者
def send_sms_code():
    print('send_sms_code')

# 1.创建celery实例
app=Celery()
# 2. 添加任务
app.add(send_sms_code)
# 3.执行任务
app.work(send_sms_code)

二. 配合Flask框架的基本使用

1.首先需要确定接口,即需要实现的功能接口并确定其接口名称

2.在main.py文件中定义接口,实现路由的使用

@app.route("/test", methods=['post'])
def test():
	func()
	return "hello world"

3.在task.py文件中定义需要调用的函数func
celery配置目录结构
在这里插入图片描述

4.使用

1.celery_worker.py

import os
import sys
from celery import Celery

BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))  # 设置 BASE_DIR
sys.path.append(BASE_DIR)
from configure.server_config import logger
from configure.configure import config

HOST = config['redis']['host']
PORT = config['redis']['port']
DB = config['celery']['db']
BROKER = f"redis://{HOST}:{PORT}/{DB}"
BACKEND = f"redis://{HOST}:{PORT}/{DB}"

# 配置 Celery
celery = Celery('celery_tasks',
                backend=BACKEND,
                broker=BROKER)
celery.conf.broker_connection_retry_on_startup = True
# celery.config_from_object('celery_tasks.celery_config')
# 其他配置
celery.conf.update(
    accept_content=['json'],
    task_serializer='json',
    result_serializer='json',
    task_time_limit=300,  # 单个任务最大运行时间(秒)
    max_tasks_per_child=100,  # 每个 worker 重启前的任务数
)
# 自动发现任务  celery_tasks.aaa -> celery_tasks 目录下的问价夹
celery.autodiscover_tasks(['celery_tasks.aaa', 'celery_tasks.bbb'])
logger.info(f'Celery started Broker -> [{BROKER}]....:')

2.在task.py中定义任务函数

@celery.task
def demo_api_task():
    	print("haha")
    	return "hahaha"

3.使用异步任务函数

@app.route('/celery_task_test', methods=['GET'])
def celery_task_test():
    from celery_tasks.aaa.tasks import demo_api_task
    res = demo_api_task.delay().get()
    logger.info(f'res -> {res}')
    return res

到此结束,celery接口实现完成
接下来启动celery任务
4.在虚拟机上进入虚拟环境,work on env_name
进入虚拟机上存放的main.py文件下启动gunicorn

guniron -w 6 --log-level=debug main:app

进入task.py文件目录下启动work

 celery -A task.celery work -l info

windows 启动命令

windows环境必须安装:pip install eventlet

celery -A celery_worker worker -l info -P eventlet

至此,celery基本调用方法使用完成

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包

打赏作者

天下·第二

你的鼓励将是我创作的最大动力

¥1 ¥2 ¥4 ¥6 ¥10 ¥20
扫码支付:¥1
获取中
扫码支付

您的余额不足,请更换扫码支付或充值

打赏作者

实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值