Django的celery配置(包括定时任务、队列)

摘要:
'print'='*40七、启动celery定时任务得先启动第六节中介绍的命令后,再执行这个命令,定时任务才能执行。因为beat只是分配定时任务给celery的worker,所以只有worker启动后,定时任务才能异步执行。
一、安装celery

Django项目不需要安装celery这个包,可以直接使用django-celery这个包,,先来安装它,在终端中输入:

pip install django-celery
二、安装rabbitmq,建立celery队列

我做的项目用的就是rabbitmq,按道理来说,也是可以用redis作为消息队列的,但是rabbitmq更好,此处不做详细解释,有兴趣的同学的可以去研究下。
ubuntu环境下,在终端中输入:

sudo apt-get install rabbitmq-server
三、配置settings.py

首先要在INSTALLED_APPS中添加对djcelery的引用

INSTALLED_APPS = [
    'decelery',
]

再在settings.py中添加以下代码

import djcelery
djcelery.setup_loader()

BROKER_URL = 'amqp://guest:guest@localhost:5672/'
CELERY_RESULT_BACKEND = 'amqp'
CELERY_IMPORTS = (
    "common.tasks.task1",
    "common.tasks.task2",
)
四、添加celery异步任务

在common/tasks/task1.py中添加以下代码,就定义了celery异步任务。
因为celery任务可能会很多,为了便于管理,我们就在项目下的common/tasks文件夹中创建了许多task.py文件,如task1.py、task2.py等。
在task1.py中添加以下代码:

from celery.task import task

@task
def function1(a, b):
    print a + b

注意,想让这个celery任务能执行,还必须要在settings.py中加一段配置,这个在上面已经添加了,这里再特别提醒下,如下:

# common、tasks是文件夹,task1、task2是tasks文件夹下面的py文件
# CELERY_IMPORTS:是导入目标任务文件
CELERY_IMPORTS = (
    "common.tasks.task1",
    "common.tasks.task2",
)

要调用这个异步任务的话,就用

from common.tasks.task1 import function

function.delay(a, b)

是不是很方便?工具的好处就是把事情简单化,但是太依赖工具,不懂的底层原理的话容易把人搞傻。
比如其实还可以用threading来创建新线程来执行异步任务,此处不再赘述,否则又是一套长篇大论,有兴趣的童鞋可以去学习一下如何使用threading库。

五、启动celery

因为我们这里用的是django-celery,而不是直接使用celery这个库,所以启动celery的命令与celery官网里面介绍的是不一样的,新人很可能因为这个掉进坑里,所以这里我特别提醒一下。
终端命令如下:

# 先启动服务器
python manage.py runserver
# 再启动worker(--concurrency=2是开4个worker进程,不加也可以启动,只不过在生产环境还是应该多开几个进程的)
python manage.py celery worker --concurrency=2 -l info

六、celery定时任务的配置

在项目中有时不仅仅使用以上的异步任务,有时候需要创建很多定时任务,这样celery又可以大显身手了。在settings.py中添加以下配置,就可以添加定时任务

from celery.schedules import crontab

# 下方的common和tasks依然是文件夹
# function2、function3分别是tasks文件夹中的task1.py、task2.py文件的函数的函数名
CELERYBEAT_SCHEDULE = {
    'function2': {
        'task': 'common.tasks.task1',
        'schedule': crontab(minute='*/50'), # 每50分钟执行一次
    },
    'function3': {
        'task': 'common.tasks.task2',
        'schedule': crontab(minute=0, hour='8,13'), # 每天的8点0分和13点0分各执行一次
    },
}

下面是common/tasks/task1.py中的函数

from celery.task import task

@task
def function2():
    print '='*40
    print 'This is function2, celery is great!'
    print '='*40

下面是common/tasks/task2.py中的函数,跟task1.py中是一样的使用方法。

from celery.task import task

@task
def function3():
    print '='*40
    print 'This is function3, celery is great!'
    print 'Fuck celery start failure!'
    print '='*40
七、启动celery定时任务

得先启动第六节中介绍的命令后,再执行这个命令,定时任务才能执行。因为beat只是分配定时任务给celery的worker,所以只有worker启动后,定时任务才能异步执行。
顾名思义,worker就是干苦力的民工,累活脏或都给它干。beat可以形象的理解为工地做计划的人,到了要干活的时候就分配任务给民工,可能是包工头,也可能是一般的管理工程进度的小弟。反正都是苦命的人,屌丝何必难为屌丝。 (╥╯^╰╥)
终端命令如下:

python manage.py celery beat -l info

八、celery队列的配置

在项目中celery的异步任务很多的时候,这个时候我们就需要将不同的任务分配到不同的队列(queue)中去执行,如果只有一个默认队列的话,所有异步任务都会在这个队列中执行(是需要排队的,先来的先执行),任务很多的时候,就没法同时执行很多任务了,甚至造成任务的拥堵。将不同的任务分配到不同的队列就可以保证同一时刻可以同时运行不同队列中的任务,互补干扰,并且每个队列可以单独开好几个进程。
进程数最好不要超过CPU的核数,因为CPU只有4个核的话,你开5个进程,同一时间还是只能执行4个进程。

我们可以在项目中设置三个队列(default, frontend, backend),队列的名字可以自己任意取。以下是在settings.py中添加队列的配置:

from kombu import Exchange, Queue

# 默认队列是default
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = 'default'
CELERY_DEFAULT_ROUTING_KEY = 'default'

# x-priority是任务的优先级
# 优先级就是哪个队列优先执行,比较紧迫的需要马上执行的任务优先级可以设置为最高
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default', consumer_arguments={'x-priority': 5}),
    Queue('frontend', Exchange('frontend'), routing_key='frontend', consumer_arguments={'x-priority': 10}),
    Queue('backend', Exchange('backend'), routing_key='backend', consumer_arguments={'x-priority': 8}),
)

# 特别需要注意的是,异步任务的路径必须精确到函数名(比如下方的common、tasks是文件夹,task1、task2是py文件,function1就是task1.py中的定义的异步任务的函数名),不然的话异步任务就没法执行
CELERY_ROUTES = {
    "common.tasks.task1.function1": {'queue': "frontend", 'routing_key': 'frontend'},
    "common.tasks.task1.function2": {'queue': "backend", 'routing_key': 'backend'},
    "common.tasks.task2.function3": {'queue': "default", 'routing_key': 'default'},
}
九、启动celery队列

配置了队列的话,如果执行python manage.py celery worker --concurrency=2 -l info的话就只会创建一个默认的队列,而我们需要创建多个队列,这样我们就不需要运行这个命令了,我们需要在终端中分别运行以下命令:

# -Q 后面加的是配置的队列名,concurrency(进程数)设置为几就由自己定了,只要不超过CPU核数就行了
python manage.py celery worker -l info -Q default --concurrency=1
python manage.py celery worker -l info -Q frontend --concurrency=2
python manage.py celery worker -l info -Q backend --concurrency=4
十、补充

Django下要查看其他celery的命令,包括参数配置、启动多worker进程的方式都可以通过python manage.py celery --help来查看,一下是终端输入命令后出来的提示信息:

Usage: manage.py celery <command> [options] 

Show help screen and exit.

Options:
  -A APP, --app=APP     app instance to use (e.g. module.attr_name)
  -b BROKER, --broker=BROKER
                        url to broker.  default is 'amqp://guest@localhost//'
  --loader=LOADER       name of custom loader class to use.
  --config=CONFIG       Name of the configuration module
  --workdir=WORKING_DIRECTORY
                        Optional directory to change to after detaching.
  -C, --no-color        
  -q, --quiet           
  --version             show program's version number and exit
  -h, --help            show this help message and exit

---- -- - - ---- Commands- -------------- --- ------------

+ Main: 
|    celery worker
|    celery events
|    celery beat
|    celery shell
|    celery multi
|    celery amqp

+ Remote Control: 
|    celery status
 
|    celery inspect --help
|    celery inspect active
|    celery inspect active_queues
|    celery inspect clock
|    celery inspect conf None
|    celery inspect memdump
|    celery inspect memsample
|    celery inspect objgraph None
|    celery inspect ping
|    celery inspect registered
|    celery inspect report
|    celery inspect reserved
|    celery inspect revoked
|    celery inspect scheduled
|    celery inspect stats
 
|    celery control --help
|    celery control add_consumer <queue> [exchange [type [routing_key]]]
|    celery control autoscale [max] [min]
|    celery control cancel_consumer <queue>
|    celery control disable_events
|    celery control enable_events
|    celery control pool_grow [N=1]
|    celery control pool_shrink [N=1]
|    celery control rate_limit <task_name> <rate_limit> (e.g. 5/s | 5/m | 5/h)>
|    celery control time_limit <task_name> <soft_secs> [hard_secs]

+ Utils: 
|    celery purge
|    celery list
|    celery migrate
|    celery call
|    celery result
|    celery report
---- -- - - --------- -- - -------------- --- ------------

Type 'celery <command> --help' for help using a specific command.

免责声明:文章转载自《Django的celery配置(包括定时任务、队列)》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇【快应用篇01】快应用它来了!带你了解什么是快应用!五 数据组织模式 (重组数据) 1 分层结构模式下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Django(七)

一、ModelForm操作及验证 1、class Meta:class Meta: #注意以下字段不能加逗号 model = models.UserInfo #这里的all代指所用的字段,也可以是一个列表,里面是model中的字段 fields = '__all__' # fields = ['username','user_type...

Socket通讯-C#客户端与Java服务端通讯(发送消息和文件)

设计思路 使用websocket通信,客户端采用C#开发界面,服务端使用Java开发,最终实现Java服务端向C#客户端发送消息和文件,C#客户端实现语音广播的功能。 Java服务端设计 package servlet.websocket; import java.io.IOException; import java.util.Map; imp...

华三SNMP配置详解

一、SNMP配置 1.1  SNMP简介 SNMP(Simple Network Management Protocol,简单网络管理协议)是网络中管理设备和被管理设备之间的通信规则,它定义了一系列消息、方法和语法,用于实现管理设备对被管理设备的访问和管理。SNMP具有以下优势:   自动化网络管理。网络管理员可以利用SNMP平台在网络上的节点检索信息、修...

1、springcloud简介

摘要: springcloud是一系列框架的有序集合。它利用springboot的开发便利性巧妙地简化了分布式系统基础设施的开发,如服务发现注册、配置中心、消息总线、负载均衡、断路器、数据监控等,都可以用springboot的开发风格做到一键启动和部署。 springcloud并没有重复制造轮子,它只是将目前各家公司开发的比较成熟、经得起实际考验的服务框架...

前端加密MD5

今天接触了MD5加密方式,记录一下使用方法,又去搜了搜关于MD5的详细内容   MD5在vue中使用方法 1、下载MD5模块 cnpm install md5 -S 2、引入模块 const md5 = require("md5") 3、加密 const str = "12345"; console.log(md5(str)...

SendMessageTimeout 的使用

在WINDOW编程中,发送消息的常用API有SendMessage,PostMessage,PostThreadMessage。 一般每个线程有两个队列:一个用来接收通过Send函数的消息,另外一个队列接收通过Post函数的消息。该两个函数的基本区别是:一个函数需要等待返回的,相当于函数调用,这个是SendMessage;另外一个是将消息放到对方的队列中,...