python 异步任务框架 Celery 入门

Celery

1. 简介

Celery 是使用 python 编写的分布式任务调度框架。

它有几个主要的概念:

版本要求

Celery5.1 要求:

Celery 是一个资金最少的项目,所以我们不支持 Microsoft Windows。

更多更详细的版本要求见官方文档

安装

使用 pip 安装:

pip install -U Celery

捆绑包

Celery 还定义了一组包,用于安装 Celery 和给定的依赖项。

可以在 pip 命令中实现中括号来指定这些依赖项。

pip install "celery[librabbitmq]"

pip install "celery[librabbitmq,redis,auth,msgpack]"

具体支持的依赖包见官方文档

2. 简单使用

1. 选择一个 broker

使用 celery 首先需要选择一个消息队列。安装任意你熟悉的前面提到的 celery 支持的消息队列。

2. 编写一个 celery 应用

首先我们需要编写一个 celery 应用,它用来创建任务和管理 wokers,它要能够被其他的模块导入。

创建一个 tasks.py 文件:

from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def add(x, y):
    return x + y

第一个参数 tasks 是当前模块的名称,它可以省略,建议以当前模块名为名称。

第二个关键字参数 broker='redis://localhost:6379/0' 指定我们使用 Redis 作为消息队列,并指定连接地址。

3.运行 celery 的 worker 服务

cd 到 tasks.py 所在目录,然后运行下面的命令来启动 worker 服务

celery -A tasks worker --loglevel=INFO

4. 调用任务

>>> from tasks import add
>>> add.delay(4,4)

通过调用任务的 delay 来执行对应的任务。celery 会把执行命令发送到 broker,broker 再将消息发送给 worker 服务来执行,如果一切正常你将会在 worker 服务的日志中看到接收任务和执行任务的日志。

5. 保存结果

如果你想要跟踪任务的状态以及保存任务的返回结果,celery 需要把它发送到某个地方。celery 提供多种结果后端。

我们这里以 reids 为例,修改 tasks.py 中的代码,添加一个 Redis 后端。

app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')

更多结果后端见官方文档

重新启动 worker 服务,重新打开 python 解释器

>>> from tasks import add
>>> result = add.delay(4,4) 

ready() 方法返回任务是否执行完成:

>>> result.ready()
False

还可以等待结果完成,但很少使用这种方法,因为它将异步调用转换为同步调用

>>> result.get(timeout=1)
8

3. 在应用中使用 celery

创建项目

项目结构:

proj/__init__.py
    /celery.py
    /tasks.py

proj/celery.py

from celery import Celery

app = Celery('proj',
             broker='redis://localhost:6379/0',
             backend='redis://localhost:6379/1',
             include=['proj.tasks'])

# 配置
app.conf.update(
    result_expires=3600, # 结果过期时间
)

在这个模块中我们创建了一个 Celery 模块。要在你的项目中使用 celery 只需要导入此实例。

proj/tasks.py

from .celery import app


@app.task
def add(x, y):
    return x + y


@app.task
def mul(x, y):
    return x * y


@app.task
def xsum(numbers):
    return sum(numbers)

启动 worker

celery -A proj worker -l INFO

调用任务

>>> from proj.tasks import add

>>> add.delay(2, 2)

4. 在 django 中使用 celery

要在你的 django 项目中使用 celery,首先需要定义一个 Celery 的实例。

如果你又 django 项目如下:

- proj/
  - manage.py
  - proj/
    - __init__.py
    - settings.py
    - urls.py

那么推荐的方法是创建一个新的 proj/proj/celery.py 模块来定义芹菜实例:

file:proj/proj/celery.py

import os

from celery import Celery

# 为`celery`设置默认的django设置模块
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

# 设置配置来源
app.config_from_object('django.conf:settings', namespace='CELERY')

# 加载所有的已注册django应用中的任务
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

然后你需要在你的 proj/proj/__init__.py 模块中导入这个应用程序。这样就可以保证 Django 启动时加载应用程序,以便于 @shared_task 装饰器的使用。

proj/proj/__init__.py:

from .celery import app as celery_app

__all__ = ('celery_app',)

请注意,此示例项目布局适用于较大的项目,对于简单的项目,可以使用包含定义应用程序和任务的单个模块。

接下来我们来解释一下 celery.py 中的代码,首先,我们设置 celery 命令行程序的环境变量 DJANGO_SETTINGS_MODULE 的默认值:

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

这一行的作用是加载当前 django 项目的环境设置,特别是当需要在异步任务中用到 ORM。它必须在创建应用程序实例之前。

app = Celery('proj')

我们还添加了 Django 设置模块作为 Celery 的配置源。这意味着我们不必使用多个配置文件,而是直接在 Django 的配置文件中配置 Celery。

app.config_from_object('django.conf:settings', namespace='CELERY')

大写命名空间意味着所有 Celery配置项 必须以大写指定,并以 CELERY_ 开头,因此例如 broker_url 设置变为 CELERY_BROKER_URL

例如,Django 项目的配置文件可能包括:

settings.py

CELERY_TIMEZONE = "Asia/Shanghai"
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30*60

接下来,可重用应用程序的常见做法是在单独的 tasks.py 模块中定义所有任务,Celery 有一种方法可以自动发现这些模块:

app.autodiscover_tasks()

使用上面的行,Celery 将按照 tasks.py 约定自动从所有已安装的应用程序中发现任务:

- app1/
    - tasks.py
    - models.py
- app2/
    - tasks.py
    - models.py

这样就不必手动将各个模块添加到 CELERY_IMPORTS 设置中。

使用 @shared_task 装饰器

我们编写的任务可能会存在于可重用的应用程序中,而可重用的应用程序不能依赖与项目本身,因此无法直接导入 celery 应用实例。

@shared_task 装饰器可以让我们无需任何具体的 celery 实例创建任务:

demoapp/tasks.py

# Create your tasks here

from demoapp.models import Widget

from celery import shared_task


@shared_task
def add(x, y):
    return x + y


@shared_task
def mul(x, y):
    return x * y


@shared_task
def xsum(numbers):
    return sum(numbers)


@shared_task
def count_widgets():
    return Widget.objects.count()


@shared_task
def rename_widget(widget_id, name):
    w = Widget.objects.get(id=widget_id)
    w.name = name
    w.save()
回帖
请输入回帖内容 ...