异步任务分发模块Celery

摘要:
Celery简介Celery是一个功能完备即插即用的任务队列。celery的特点是:简单,易于使用和维护,有丰富的文档。高效,单个celery进程每分钟可以处理数百万个任务。灵活,celery中几乎每个部分都可以自定义扩展。celery非常易于集成到一些web开发框架中。我们可以使用异步任务分发系统Celery解决上面的问题。Celery的概念、配置及使用任务队列是一种跨线程、跨机器工作的一种机制。

Celery简介

Celery是一个功能完备即插即用的任务队列。它使得我们不需要考虑复杂的问题,使用非常简单。 celery适用异步处理问题,当遇到发送邮件、或者文件上传, 图像处理等等一些比较耗时的操作,我们可将其异步执行,这样用户不需要等待很久,提高用户体验。

celery的特点是:

  • 简单,易于使用和维护,有丰富的文档。
  • 高效,单个celery进程每分钟可以处理数百万个任务。
  • 灵活,celery中几乎每个部分都可以自定义扩展。

celery非常易于集成到一些web开发框架中。

使用场景简介

我们在做网站后端程序开发时,会碰到这样的需求:用户需要在我们的网站填写注册信息,我们发给用户一封注册激活邮件到用户邮箱,如果由于各种原因,这封邮件发送所需时间较长,那么客户端将会等待很久,造成不好的用户体验。

异步任务分发模块Celery第1张

我们可以使用异步任务分发系统Celery解决上面的问题。

我们将耗时任务放到后台异步执行。不会影响用户其他操作。除了注册功能,例如上传,图形处理等等耗时的任务,都可以按照这种思路来解决。 如何实现异步执行任务呢?我们可使用celery.。

celery除了刚才所涉及到的异步执行任务之外,还可以实现定时处理某些任务。

Celery的概念、配置及使用

异步任务分发模块Celery第2张

任务队列是一种跨线程、跨机器工作的一种机制。

任务队列中包含称作任务的工作单元。有专门的工作进程持续不断的监视任务队列,并从中获得新的任务并处理。

celery通过消息进行通信,通常使用一个叫Broker(中间人)来协client(任务的发出者)和worker(任务的处理者). clients发出消息到队列中,broker将队列中的信息派发给worker来处理。

一个celery系统可以包含很多的worker和broker,可增强横向扩展性和高可用性能。

安装celery

直接使用pip包管理工具可以安装celery:

pip install celery

也可从官方直接下载安装包:https://pypi.python.org/pypi/celery/

tar xvfz celery-0.0.0.tar.gz
cd celery-0.0.0
python3 setup.py build
python3 setup.py install

Broker的选择

Celery需要一种解决消息的发送和接受的方式,我们把这种用来存储消息的的中间装置叫做message broker, 也可叫做消息中间人。

作为中间人,我们有几种方案可选择:

RabbitMQ

RabbitMQ是一个功能完备,稳定的并且易于安装的broker. 它是生产环境中最优的选择。使用RabbitMQ的细节参照以下链接:http://docs.celeryproject.org/en/latest/getting-started/brokers/rabbitmq.html#broker-rabbitmq

如果我们使用的是Ubuntu或者Debian发行版的Linux,可以直接通过下面的命令安装RabbitMQ: sudo apt-get install rabbitmq-server 安装完毕之后,RabbitMQ-server服务器就已经在后台运行。如果您用的并不是Ubuntu或Debian, 可以在以下网址:http://www.rabbitmq.com/download.html去查找自己所需要的版本软件。

Redis

Redis也是一款功能完备的broker可选项,但是其可能因意外中断或者电源故障导致数据丢失的情况。

关于是有那个Redis作为Broker,可访下面网址:http://docs.celeryproject.org/en/latest/getting-started/brokers/redis.html#broker-redis

存储结果

如果我们想跟踪任务的状态,Celery需要将结果保存到某个地方。有几种保存的方案可选:SQLAlchemy、Django ORM、Memcached、 Redis、RPC (RabbitMQ/AMQP)。

Celery使用的一个简单示例

项目的目录结构如下:

异步任务分发模块Celery第3张

我们在tasks包中加入需要异步调用的任务,在task_demo.py文件中写具体的任务:

#-*- coding:utf-8 -*-
from celery importCelery
#定义celery对象
#123是我redis的密码;使用redis的1号库作为broker,2号库作为存取结果的地方
celery_app = Celery("demo1",
                    broker="redis://:123@127.0.0.1:6800/1",
                    backend="redis://:123@127.0.0.1:6800/2"
                    )
@celery_app.task
defprint_now(now):
    """异步任务的demo"""
    print(now)
    #定义一个返回值,如果不写的话会默认返回None
    return "当前时间:{}".format(now)

然后在外部的transfer.py文件中加入调用这个任务的代码:

#-*- coding:utf-8 -*-
importtime
importredis
from tasks.task_demo importprint_now
#创建redis连接,选择2号库
conn = redis.Redis(host="127.0.0.1",port=6800,password=123,db=2)
#使用celery异步任务
#delay函数调用后立即返回
now = time.strftime("%Y-%m-%d %X")
#把参数放在delay中!
ret = print_now.delay(now)
print(ret,type(ret)) # 7af04c3c-8070-4bf1-9563-b220fd5aac9c <class 'celery.result.AsyncResult'>

然后在项目的目录下启动celery:

异步任务分发模块Celery第4张

启动的结果及说明如下:

异步任务分发模块Celery第5张

Celery启动时如果上报AttributeError: async这个错误,建议把Celery的版本升级到4.1.1

启动成功后,终端会夯住;然后运行transfer.py文件,在终端会出现这样的提示:

异步任务分发模块Celery第6张

然后我们再利用redis可视化工具看看结果:

异步任务分发模块Celery第7张

我们可以看到,在执行完一次任务后,redis数据库1存放着celery的broker的信息,在库2中存放着执行的结果。

Django中使用Celery生成首页静态页面并将结果存在本地数据库中 *****

在实际中,我们网站的首页内容是不怎么变化的,如果每次请求首页都对数据库中的内容进行查询的话,这样对数据库来说负担会很大。

我们可以使用celery生成一下静态页面,当没有数据变化的时候用户每次请求主页我们都让他们去访问这个生成好的“首页”,有当数据变化的时候再重新生成一下这个“静态页面”就可以了。

项目的目录如下

异步任务分发模块Celery第8张

路由与视图如下

路由:

from django.contrib importadmin
from django.urls importpath
from demo importviews
urlpatterns =[
    path('admin/', admin.site.urls),
path('index/',views.Index.as_view()),
]

视图:

from django.views importView
from django.shortcuts importrender
from tasks.static_index importindex_static
classIndex(View):
    defget(self,request):
        #这里只是模拟一下后台是否有数据改动的情况
        #当数据有修改的时候需要进行判断,把修改后的数据传过去再重新进行页面的生成!
        #如果a=1表示后台没有数据修改
        a = 12
        if a == 1:
            return render(request, "index.html")
        #有数据修改了,就重新生成一下静态的页面
        else:
            index_static.delay()
            return render(request,"index.html")

将结果存储在本地数据库中的配置

此处需要用到额外包django_celery_results, 先安装包:

pip3 install django-celery-results

在celery_demo/settings.py中安装此应用:

INSTALLED_APPS =[
   xxx
    #自定义app或第三方app
    'demo.apps.DemoConfig',
    #celery用于存储结果的应用
    'django_celery_results',
    #celery用于定时任务的应用
    'django_celery_beat'
]

此时,tasks/static_index.py文件中的配置以及生成静态页面的代码如下:

importos
from celery importCelery
from django.conf importsettings
from django.template importloader,RequestContext
### 注意!必须引入Django环境!为celery设置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djangoCeleryDemo.settings')
## 创建应用
#定义celery对象
#123是我redis的密码;使用redis的1号库作为broker
celery_app = Celery("demo2")
## 配置应用
celery_app.conf.update(
    #使用redis的库1作为消息队列
    BROKER_URL="redis://:123@127.0.0.1:6800/1",
    #使用项目数据库存储任务执行结果
    CELERY_RESULT_BACKEND='django-db',
    #如过想把把结果存在redis的库2中这样配置:
    #CELERY_RESULT_BACKEND = "redis://:123@127.0.0.1:6800/2",
)
@celery_app.task
defindex_static():
    #这里可以使用ORM查询的结果等等
    context ={
            "msg":"这是一个基于Celery实现的首页静态页面"
        }
    #使用模板
    #加载模板文件,返回模板对象
    temp = loader.get_template("index_static.html")
    #模板渲染
    index_static_html =temp.render(context)
    #生成首页静态页面 —— 存放在static目录下
    index_static_html_path = os.path.join(settings.BASE_DIR,'templates','index.html')
    with open(index_static_html_path,'w')as f:
        f.write(index_static_html)
    #作为测试,这里返回一个字符串OK
    return "OK"

创建django_celery_results应用所需数据库表, 执行迁移文件:

这里需要注意,由于后续我们会用到admin页面,因此需要先迁移一下“默认注册应用auth”相关的数据库:

python3 manage.py migrate

然后迁移django_celery_results的表:

python3 manage.py migrate django_celery_results

启动celery

进入项目根目录,启动celery:

celery -A tasks.static_index worker -l info

异步任务分发模块Celery第9张

访问index页面查看结果

此时我们再访问index页面,可以从结果的表中看到result:

异步任务分发模块Celery第10张

Django中使用celery做定时任务 *****

如果我们想某日某时执行某个任务,或者每隔一段时间执行某个任务,也可以使用celery来完成。

使用定时任务,需要安装额外包:

pip3 install django_celery_beat

然后在settings.py中安装此应用:

INSTALLED_APPS =[
    xxx
    #自定义app或第三方app
    'demo.apps.DemoConfig',
    #celery用于存储结果的应用
    'django_celery_results',
    #celery用于定时任务的应用
    'django_celery_beat'
]

然后,tasks/static_index.py文件中的配置以及定时任务的代码如下:

#-*- coding:utf-8 -*-
importos
from celery importCelery
from django.conf importsettings
from django.template importloader,RequestContext
### 注意!必须引入Django环境!为celery设置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djangoCeleryDemo.settings')
## 创建应用
#定义celery对象
#123是我redis的密码;使用redis的1号库作为broker
celery_app = Celery("demo2")
## 配置应用
celery_app.conf.update(
    #使用redis的库1作为消息队列
    BROKER_URL="redis://:123@127.0.0.1:6800/1",
    #使用项目数据库存储任务执行结果
    CELERY_RESULT_BACKEND='django-db',
    #把结果存在redis的库2中
    #CELERY_RESULT_BACKEND = "redis://:123@127.0.0.1:6800/2",
    #配置定时器模块,定时器信息存储在数据库中
    CELERYBEAT_SCHEDULER='django_celery_beat.schedulers.DatabaseScheduler',
)
### 定时任务
@celery_app.task
definterval_task():
    print("我每隔5秒执行一次...")
# 作为测试,这里返回666
return 666

由于定时器信息存储在数据库中,我们需要先生成对应表, 对diango_celery_beat执行迁移操作,创建对应表:

python3 manage.py migrate django_celery_beat

其他的表是之前迁移的时候生成的:

异步任务分发模块Celery第11张

由于我们需要在数据库中添加数据才能使用定时任务,因此这里需要创建一下后台管理员账号:

python3 manage.py createsuperuser

然后登陆后台管理员admin界面:

异步任务分发模块Celery第12张

其中Crontabs用于定时某个具体时间执行某个任务的时间;

Intervals用于每隔多久执行任务的事件;

具体任务的执行在Periodic tasks表中创建。

我们要创建每隔5秒执行某个任务,所以在Intervals表名后面点击Add按钮:

异步任务分发模块Celery第13张

然后在Periodic tasks表名后面,点击Add按钮,添加任务:

异步任务分发模块Celery第14张

启动定时任务需要在后面加上--beat参数!

celery -A tasks.static_index worker -l info --beat

异步任务分发模块Celery第15张

结果如下:

异步任务分发模块Celery第16张

我们可以在admin的Task results表中查看结果:

其中上面3条是定时任务的结果,下面那一条是生成静态页面的结果。

异步任务分发模块Celery第17张

免责声明:文章转载自《异步任务分发模块Celery》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Resource接口,及资源VirtualBox 安装 centos7下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

redis的LRU算法(一)

最近加班比较累,完全不想写作了。。 刚看到一篇有趣的文章,是redis的作者antirez对redis的LRU算法的回顾。LRU算法是Least Recently Used的意思,将最近最少使用的资源丢掉。Redis经常被用作cache,如果能够将不常用的key移除,尽量保留常用的,那内存的利用率就相当高了。当然,LRU也有弱点,考虑下面一种情况: ~~~...

SpringBoot整合Redis乱码原因及解决方案

问题描述:springboot使用spring data redis存储数据时乱码 redis key/value 出现xACxEDx00x05tx00x05 问题分析: 查看RedisTemplate类 JdkSerializationRedisSerializer类 SerializingConverter类 DefaultSerialize...

redis哨兵模式

一、引言             上一篇文章我们详细的讲解了Redis的主从集群模式,其实这个集群模式配置很简单,只需要在Slave的节点上进行配置,Master主节点的配置不需要做任何更改,但是有一点,Master和Slave两个节点的持久化配置尽量保持一致,否则会有奇怪的问题出现。从今天开始我们开始讲Redis集群模式的第二模式,也就是“哨兵”模式,...

Redis----windows下的常用命令二

Redis 是一个开源,高级的键值对的存储。它经常作为服务端的数据结构,它的键的数据类型能够是strings, hashs, lists, sets(无序集合) 和 sorted sets(有序集合).启动redis服务:redis-server.exe redis.windows.conf启用客户端:redis-cli.exe双击 redis-cli.e...

redis maxmemory设置

关于maxmemory的设置,如果redis的应用场景是作为db使用,那不要设置这个选项,因为db是不能容忍丢失数据的。 如果作为cache使用,则可以启用这个选项(其实既然有淘汰策略,那就是cache了。。。) 指定Redis最大内存限制,Redis在启动时会把数据加载到内存中,达到最大内存后,Redis会先尝试清除已到期或即将到期的Key, #...

SpringBoot整合Jedis

一、说明   Spring中可以配置RedisTemplate来操作Redis,但是本文中并没有使用RedisTemplate,而是单纯的使用Spring的IoC,单独创建一个配置类,用来配置Redis,然后在需要进行Redis操作的地方,注入配置的Jedis即可。   也就是说,本文中的内容,单纯地使用Jedis,其实和普通java项目配置Redis并没...