一、介绍
Celery是一个简单,灵活,可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具。它是一个任务队列,专注于实时处理,同时还支持任务调度。
celery 的优点
- 简单:celery的 配置和使用还是比较简单的, 非常容易使用和维护和不需要配置文件
- 高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务
- 如果连接丢失或发生故障,worker和client 将自动重试,并且一些代理通过主/主或主/副本复制方式支持HA。
- 快速:一个单进程的celery每分钟可处理上百万个任务
- 灵活: 几乎celery的各个组件都可以被扩展及自定制
celery 可以做什么
- 异步发邮件 , 一般发邮件比较耗时的操作,需要及时返回给前端,这个时候 只需要提交任务给celery 就可以了.之后 由worker 进行发邮件的操作 .
- 比如有些 跑批接口的任务,需要耗时比较长,这个时候 也可以做成异步任务 .
- 定时调度任务等
celery 的核心模块
- Task:就是任务,有异步任务和定时任务
- Broker:中间人,接收生产者发来的消息即Task,将任务存入队列。任务的消费者是Worker。Celery本身不提供队列服务,推荐用Redis或RabbitMQ实现队列服务。
- Worker:执行任务的单元,它实时监控消息队列,如果有任务就获取任务并执行它。
- Beat: 定时任务调度器,根据配置定时将任务发送给Broker。
- Backend: 用于存储任务的执行结果
二、celery实现异步任务
1、入门案例
1、编写celery服务执行的任务【celery_task.py】
import celery import time # redis异步队列 BROKER_URL = 'redis://localhost:6379/1' # 结果存储至redis CELERY_RESULT_BACKEND = 'redis://localhost:6379/2' app = celery.Celery('test', backend=CELERY_RESULT_BACKEND, broker=BROKER_URL) # 往celery中注册任务 @app.task def send_email(name): print("给{}发送了一封邮件"。format(name)) time.sleep(2) print("邮件发送完毕") @app.task def send_msg(name): print("给{}发送了一条短信"。format(name)) time.sleep(2) print("短信发送完毕")
2、启动celery服务
celery -A celery_task worker -l info
- 注意点:window下要加参数
-P eventlet
,指定为协程模式并发celery -A celery_task worker -l info -P eventlet
- 更多参数说明
-A :指定celery服务的执行任务
-l:指定日志等级
-P: 并发模式( prefork (默认multiprocessing), eventlet, gevent, threads.
-c: –concurrency=10, 并发级别,prefork 模型下就是子进程数量,默认等于 CPU 核心数
3、编写往selery提交任务的代码【submit_task.py】
from celery_task import send_email,send_msg result = send_email.delay("小柠") # 获取执行结果的id print(result.id) result2 = send_msg.delay("小柠") # 获取执行结果的id print(result2.id)
4、执行submit_task.py
- 运行上述文件,就可以看到celery服务中执行我们提交的任务
5、通过执行结果id获取执行结果
from celery_task import send_email,send_msg,app # 提交任务 result = send_email.delay("yuan") # 等待任务执行完毕获取结果 from celery.result import AsyncResult async_result = AsyncResult(id=result.id, app=app) if async_result.successful(): result = async_result.get() print(result) elif async_result.failed(): print('执行失败') elif async_result.status == 'PENDING': print('任务等待中被执行') elif async_result.status == 'RETRY': print('任务异常后正在重试') elif async_result.status == 'STARTED': print('任务已经开始被执行')
2、多任务结构
如果要在多个任务模块中定义celery的任务函数,应该怎么去实现呢?
1、项目结构
2、
celery.py
代码如下from celery import Celery app = Celery('celery_demo', broker='redis://redis服务host地址:6379/1', backend='redis://redis服务host地址:6379/2', # celery会在指定的模块中查找任务函数 include=['celery_app.celery_tasks.task01', 'celery_app.celery_tasks.task02' ]) # 时区 app.conf.timezone = 'Asia/Shanghai' # 是否使用UTC app.conf.enable_utc = False
3、
task01.py
from celery_app.celery_tasks.celery import app import time @app.task def send_email(name): print("给{}发送了一封邮件"。format(name)) time.sleep(2) print("邮件发送完毕")
4、
task02.py
from celery_app.celery_tasks.celery import app import time @app.task def send_msg(name): print("给{}发送了一条短信"。format(name)) time.sleep(2) print("短信发送完毕")
5、开启celery服务
celery worker -A celery_tasks -l info -P eventlet
1
三、celery实现定时任务
1、简单定时任务调度
from celery_task import send_email, send_msg from datetime import datetime from datetime import timedelta # 方式一:固定的某一个时间执行 # t = datetime(2022, 3, 4, 18, 20, 00) # t = datetime.utcfromtimestamp(t.timestamp()) # result = send_email.apply_async(args=["小柠",], eta=t) # print(result.id) # 方式二:当前之后之后多久执行 ctime = datetime.now() # 默认用utc时间 # 获取当前的时间 t2 = datetime.utcfromtimestamp(ctime.timestamp()) #当前事件加10秒 task_time = t2 + timedelta(seconds=10) # 使用apply_async并设定时间执行 result = send_email.apply_async(args=["小柠"], eta=task_time) print(result.id)
2、多目录下定时任务调度
from datetime import timedelta from celery import Celery app = Celery('celery_demo', broker='re dis://redis服务host地址:6379/1', backend='redis://redis服务host地址:6379/2', # celery会在指定的模块中查找任务函数 include=['celery_app.celery_tasks.task01', 'celery_app.celery_tasks.task02' ]) # 时区 app.conf.timezone = 'Asia/Shanghai' # 是否使用UTC app.conf.enable_utc = False # 设置定时任务调度器 app.conf.beat_schedule = { # 名字随意命名 # 'run_test_case': { # 'task': 'celery_tasks.task01.send_email', # 每年5月1号,8点4分执行 # 'schedule': crontab(minute=4, hour=8, day_of_month=1, month_of_year=5), # 'args': ('张三',) # }, }
1、运行celery服务(执行任务)
celery -A celery_tasks worker -l info -P eventlet # 或者 -P threads
2、运行定时任务(插入任务)
celery -A celery_tasks beat
- 注意点:如果先启动插入任务的程序,celery会根据设置的定时规则,往执行队列中插入任务,如果没有执行,任务则会累计在队列中,等work程序一启动会批量执行。
欢迎来到testingpai.com!
注册 关于