高大上的 Celery 快速入门

本贴最后更新于 1024 天前,其中的信息可能已经时移世改

一、介绍

Celery是一个简单,灵活,可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具。它是一个任务队列,专注于实时处理,同时还支持任务调度。

celery 的优点

  • 简单:celery的 配置和使用还是比较简单的, 非常容易使用和维护和不需要配置文件
  • 高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务
  • 如果连接丢失或发生故障,worker和client 将自动重试,并且一些代理通过主/主或主/副本复制方式支持HA。
  • 快速:一个单进程的celery每分钟可处理上百万个任务
  • 灵活: 几乎celery的各个组件都可以被扩展及自定制

celery 可以做什么

  • 异步发邮件 , 一般发邮件比较耗时的操作,需要及时返回给前端,这个时候 只需要提交任务给celery 就可以了.之后 由worker 进行发邮件的操作 .
  • 比如有些 跑批接口的任务,需要耗时比较长,这个时候 也可以做成异步任务 .
  • 定时调度任务等

celery 的核心模块

  • Task:就是任务,有异步任务和定时任务
  • Broker:中间人,接收生产者发来的消息即Task,将任务存入队列。任务的消费者是Worker。Celery本身不提供队列服务,推荐用Redis或RabbitMQ实现队列服务。
  • Worker:执行任务的单元,它实时监控消息队列,如果有任务就获取任务并执行它。
  • Beat: 定时任务调度器,根据配置定时将任务发送给Broker。
  • Backend: 用于存储任务的执行结果

二、celery实现异步任务

1、入门案例

1、编写celery服务执行的任务【celery_task.py】

import celery
import time

# redis异步队列
BROKER_URL = 'redis://localhost:6379/1'
# 结果存储至redis
CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'
app = celery.Celery('test', backend=CELERY_RESULT_BACKEND, broker=BROKER_URL)
# 往celery中注册任务
@app.task
def send_email(name):
   	print("给{}发送了一封邮件"format(name))
   	time.sleep(2)
   	print("邮件发送完毕")
   
@app.task
def send_msg(name):
	print("给{}发送了一条短信"format(name))
	time.sleep(2)
   	print("短信发送完毕")

2、启动celery服务

celery -A celery_task worker -l info 

  • 注意点:window下要加参数-P eventlet,指定为协程模式并发
celery -A celery_task worker -l info -P eventlet
  • 更多参数说明

    -A :指定celery服务的执行任务

    -l:指定日志等级

    -P: 并发模式( prefork (默认multiprocessing), eventlet, gevent, threads.

    -c: –concurrency=10, 并发级别,prefork 模型下就是子进程数量,默认等于 CPU 核心数

3、编写往selery提交任务的代码【submit_task.py】

from celery_task import send_email,send_msg

result = send_email.delay("小柠")
# 获取执行结果的id
print(result.id)

result2 = send_msg.delay("小柠")
# 获取执行结果的id
print(result2.id)

4、执行submit_task.py

  • 运行上述文件,就可以看到celery服务中执行我们提交的任务

5、通过执行结果id获取执行结果

from celery_task import send_email,send_msg,app
# 提交任务
result = send_email.delay("yuan")

# 等待任务执行完毕获取结果
from celery.result import AsyncResult
async_result = AsyncResult(id=result.id, app=app)
if async_result.successful():
 result = async_result.get()
 print(result)
elif async_result.failed():
 print('执行失败')
   elif async_result.status == 'PENDING':
    print('任务等待中被执行')
elif async_result.status == 'RETRY':
    print('任务异常后正在重试')
elif async_result.status == 'STARTED':
    print('任务已经开始被执行')

2、多任务结构

如果要在多个任务模块中定义celery的任务函数,应该怎么去实现呢?

1、项目结构

1646298226920.png

2、celery.py代码如下
from celery import Celery
   
   app = Celery('celery_demo',
             broker='redis://redis服务host地址:6379/1',
             backend='redis://redis服务host地址:6379/2',
             # celery会在指定的模块中查找任务函数
             include=['celery_app.celery_tasks.task01',
                   'celery_app.celery_tasks.task02'
                   ])

# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
3、task01.py
from celery_app.celery_tasks.celery import app
import time


   @app.task
   def send_email(name):
   	print("给{}发送了一封邮件"format(name))
   	time.sleep(2)
	print("邮件发送完毕")
4、task02.py
from celery_app.celery_tasks.celery import app
import time


@app.task
   def send_msg(name):
   	print("给{}发送了一条短信"format(name))
   	time.sleep(2)
   	print("短信发送完毕")

5、开启celery服务

celery worker -A celery_tasks -l info -P eventlet

1

三、celery实现定时任务

1、简单定时任务调度

from celery_task import send_email, send_msg

from datetime import datetime
from datetime import timedelta
# 方式一:固定的某一个时间执行
# t = datetime(2022, 3, 4, 18, 20, 00)
# t = datetime.utcfromtimestamp(t.timestamp())
# result = send_email.apply_async(args=["小柠",], eta=t)
# print(result.id)

# 方式二:当前之后之后多久执行
ctime = datetime.now()
# 默认用utc时间
# 获取当前的时间
t2 = datetime.utcfromtimestamp(ctime.timestamp())
#当前事件加10秒
task_time = t2 + timedelta(seconds=10)

# 使用apply_async并设定时间执行
result = send_email.apply_async(args=["小柠"], eta=task_time)
print(result.id)

2、多目录下定时任务调度

from datetime import timedelta

from celery import Celery

app = Celery('celery_demo',
          broker='re
             dis://redis服务host地址:6379/1',
          backend='redis://redis服务host地址:6379/2',
          # celery会在指定的模块中查找任务函数
          include=['celery_app.celery_tasks.task01',
                   'celery_app.celery_tasks.task02'
                   ])

# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

# 设置定时任务调度器
app.conf.beat_schedule = {
 # 名字随意命名
 # 'run_test_case': {
   #     'task': 'celery_tasks.task01.send_email',
   #     每年5月1号,8点4分执行
   #     'schedule': crontab(minute=4, hour=8, day_of_month=1, month_of_year=5),
   #     'args': ('张三',)
   # },
   }

1、运行celery服务(执行任务)

celery -A celery_tasks worker -l info  -P eventlet   # 或者 -P threads

2、运行定时任务(插入任务)

celery -A celery_tasks beat 
  • 注意点:如果先启动插入任务的程序,celery会根据设置的定时规则,往执行队列中插入任务,如果没有执行,任务则会累计在队列中,等work程序一启动会批量执行。
回帖
请输入回帖内容 ...