项目需要对一些观测数值做监控预警,没有找到特别合适的专门的开源项目,因此决定采用Celery实现任务调度,当然,除了Cerlery,还有就是可以采用APScheduler实现,也有django-apscheduler的开源项目利用Django的ORM存储APScheduler的任务和调度信息,但是APScheduler并没有那么灵活,比如要想修改例行信息,是需要修改源代码并且重启Django进程才可以,也正是参考了这个Stackoverflow问题,才决定选用Celery来调度任务。

关于Celery的定义,我想其项目网站主页的标题最为精确不过了——Distributed Task Queue,这里只简单的介绍一下Celery的工作模式,其他的使用细节可以参看网上的大量文章,不过建议以项目本身的文档为主,最为权威,尤其是在现在网络文章互相抄来抄去,质量极低!

Celery是一个分布式任务队列,那么就需要一个Broker也就是经纪人(我看好多人翻译成中间人,觉得好low啊)来存储和分发任务消息,Worker是实际执行任务的角色,Worker进程启动后,从经纪人那里读取并执行相应任务,任务是一早定义好的,可以手动调用,也可以用CelerBeat实现例行。如果了解生产者消费者模型的话,Worker对应消费者,CeleryBeat对应生产者。

也已经有开源项目django-celery,使得可以很方便的在Django中添加Celery任务,但是这个项目目前不支持Django 2及以上,原因是Celery调用的kombu库会抛出一个missing 1 required positional argument: on_delete的错误,而恰是在Django 2的时候,on_delete改成了必须字段,由于是Docker配置的开发环境,也懒得去修改镜像,因为Celery 本身就有对Django的支持,所以就自己动手,并不复杂。

具体的过程可以参考Using Django 2 with Celery and Redis 和 官方文档Using Celery with Django,很简单就不上代码了,这里有一个很深的坑,celery.py中配置了任务自动发现app.autodiscover_tasks(),但在各个app中定义任务的模块名一定要是tasks,也就是示例代码中定义了任务的文件名一定要是tasks.py,而不能是其他的任何名称,因为自己尝试的时候手残敲错了,调试了很久才发现!

1
2
3
4
5
6
- app1/
- tasks.py
- models.py
- app2/
- tasks.py
- models.py

另外还有一个需要注意的问题就是调用任务时任务的name该如何写,从哪一层模块开始也至关重要,task-naming-relative-imports解释了why and how,这一点很多其他文章都没提及,也是容易出现的一个坑。具体放在Django中,以app1中任务为例,也就是在INSTALLED_APPS中添加的是app1,那么调用的时候要用到的任务名称就是app1.tasks.task1,也可以是嵌套的类如app1.app2.tasks.task1

写了一个简单的django2-celery-demo,以供参考。

参考

[1]. Stackoverflow: How to setup APScheduler in a Django project?
[2]. django-celery
[3]. django-apscheduler
[4]. Using Django 2 with Celery and Redis
[5]. Using Celery with Django
[6]. task-naming-relative-imports