Python Celery分布式任务队列的安装与介绍(基于Redis)

摘要:
Celery是一个基于Python编写的分布式任务队列。对任务(耗时的任务和计划的任务)的异步处理可以通过对Celery的简单操作来实现。Celery的安装从Celery 4.0开始,并且不支持在Windows平台1.1上安装celerypipinstall-U“Celery[redis]”。注意:在Windows上安装后,可能会出现以下错误:

Celery是一个基于Python编写的分布式任务队列(Distributed Task Queue), 通过对Celery进行简单操作就可以实现任务(耗时任务, 定时任务)的异步处理

一. Celery的安装

Celery4.0版本开始,不支持windows平台

1.1 通过pip方式安装celery

pip install -U "Celery[redis]"

注意事项:

  在windows上安装后,可能会出现如下报错:

ValueError: '__name__' in __slots__ conflicts with class variable

   此时先卸载celery, 然后尝试通过如下命令重新进行安装

pip install -U https://github.com/celery/py-amqp/zipball/master
pip install -U https://github.com/celery/billiard/zipball/master
pip install -U https://github.com/celery/kombu/zipball/master
pip install -U https://github.com/celery/celery/zipball/master
pip install -U "Celery[redis]"

1.2 给celery创建一个软连接

ln -s ~/.venv/project_dj/bin/celery /usr/bin/celery

1.3 执行celery命令

[root@localhost ~]$ celery --help
Options:
  -A, --app APPLICATION
  -b, --broker TEXT
  --result-backend TEXT
  --loader TEXT
  --config TEXT
  --workdir PATH
  -C, --no-color
  -q, --quiet
  --version
  --help                 Show this message and exit.
Commands:
  amqp     AMQP Administration Shell.
  beat     Start the beat periodic task scheduler.
  call     Call a task by name.
  control  Workers remote control.
  events   Event-stream utilities.
  graph    The ``celery graph`` command.
  inspect  Inspect the worker at runtime.
  list     Get info from broker.
  logtool  The ``celery logtool`` command.
  migrate  Migrate tasks from one broker to another.
  multi    Start multiple worker instances.
  purge    Erase all messages from all known task queues.
  report   Shows information useful to include in bug-reports.
  result   Print the return value for a given task id.
  shell    Start shell session with convenient access to celery symbols.
  status   Show list of workers that are online.
  upgrade  Perform upgrade between versions.
  worker   Start worker instance.
二. Celery的基本使用

2.1 创建celery应用, 并定义任务

# -*- coding: utf-8 -*-
# @Time    : 2021/5/24 11:20
# @Author  : chinablue
# @File    : task.py


from celery import Celery

# 创建一个app(Celery实例),作为所有celery操作的切入点
broker_url = f"redis://:123456@127.0.0.1:6379/5"
backend_url = f"redis://:123456@127.0.0.1:6379/6"
app = Celery("tasks", broker=broker_url, backend=backend_url)


# 定义一个任务
@app.task
def add(x, y):
    return x + y

事项说明:

  1) 创建Celery实例时,需要指定一个消息代理(broker)来接收和发送任务消息. 本文使用的是Redis(docker redis搭建)

  2) broker和backend参数的格式: redis://:password@hostname:port/db_number

2.2 启动celery worker服务端

celery -A tasks worker --loglevel=INFO

事项说明:

  1) 在生产环境中, 会使用supervisor工具将celery服务作为守护进程在后台运行

2.3 调用任务

打开终端, 进入python命令行模式:

>>> result = add.delay(4, 4)
>>> result = add.apply_async((4, 4), countdown=5)

事项说明:

  1) add.apply_async((4, 4)) 可以简写为 add.delay(4, 4)

  2) add.apply_async((4, 4), countdown=5) 表示任务发出5秒后再执行

2.4 追踪任务信息

若想获取每个任务的执行信息,在创建Celery实例时, 需要指定一个后端(backend). 本文使用的是Redis(docker redis搭建)

result = add.delay(4, 4)        
result.ready()       # 任务状态: 进行中, 已完成
result.failed()      # 任务完成, 任务失败
result.successful()  # 任务完成, 任务成功
result.state         # 任务状态: PENDING, STARTED, SUCCESS
result.get()         # 获取任务的返回值        
result.get(timeout=10)
result.get(propagate=False)  # 如果任务引发了异常, propagate=False表示异常不会被抛出来(默认情况会抛出来)
result.id            # 任务id 

注意事项:

  1) 在celery中,如果想配置backend参数,有如下三种方式

Python Celery分布式任务队列的安装与介绍(基于Redis)第1张Python Celery分布式任务队列的安装与介绍(基于Redis)第2张
# -*- coding: utf-8 -*-
# @Time    : 2021/5/24 11:20
# @Author  : chinablue
# @File    : task.py


from celery import Celery

# 创建一个app(Celery实例),作为所有celery操作的切入点
broker_url = f"redis://:123456@127.0.0.1:6379/5"
backend_url = f"redis://:123456@127.0.0.1:6379/6"
app = Celery("tasks", broker=broker_url, backend=backend_url)


# 定义一个任务
@app.task
def add(x, y):
    return x + y
方式1: 实例化Celery时传入
Python Celery分布式任务队列的安装与介绍(基于Redis)第3张Python Celery分布式任务队列的安装与介绍(基于Redis)第4张
# -*- coding: utf-8 -*-
# @Time    : 2021/5/24 11:20
# @Author  : chinablue
# @File    : task.py


from celery import Celery

broker_url = f"redis://:123456@127.0.0.1:6379/5"
backend_url = f"redis://:123456@127.0.0.1:6379/6"
app = Celery("tasks")

app.conf.update({
    "broker_url": broker_url,
    "result_backend": backend_url,
})


# 定义一个任务
@app.task
def add(x, y):
    return x + y
方式2: 通过conf的update方法
Python Celery分布式任务队列的安装与介绍(基于Redis)第5张Python Celery分布式任务队列的安装与介绍(基于Redis)第6张
# -*- coding: utf-8 -*-
# @Time    : 2021/5/24 11:20
# @Author  : chinablue
# @File    : task.py


from celery import Celery

broker_url = f"redis://:123456@127.0.0.1:6379/5"
backend_url = f"redis://:123456@127.0.0.1:6379/6"
app = Celery("tasks")

app.conf.broker_url = broker_url
app.conf.result_backend = backend_url


# 定义一个任务
@app.task
def add(x, y):
    return x + y
方式3: 通过conf属性传递参数

免责声明:文章转载自《Python Celery分布式任务队列的安装与介绍(基于Redis)》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇【嵌入式开发】 Bootloader 详解 ( 代码环境 | ARM 启动流程 | uboot 工作流程 | 架构设计)Eclipse:Eclipse平台技术概述下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

CentOS7为php7.2安装php-redis扩展(redis环境搭建二)

安装前检查 安装前查看phpinfo()输出的版本与php -v 的版本是否一致,如果不一致安装成功后,输出phpinfo和php -v 都会找不到redis扩展,本人在此踩过坑 解决php -v查看到版本于phpinfo()打印的版本不一致问题可参考我的另一篇博客:https://www.cnblogs.com/clubs/p/13377676.html...

在ASP.NET WebAPI 中使用缓存【Redis】

初步看了下CacheCow与OutputCache,感觉还是CacheOutput比较符合自己的要求,使用也很简单 PM>Install-Package Strathweb.CacheOutput.WebApi2 基础使用 CacheOutput特性 [Route("get")] [CacheOutput(Cli...

Redis 占用Windows系统盘空间23G

Redis常出现问题总结: 1、当出现修改--maxheap and --heapdir 在启动这两个版本时都会创建一个 RedisQFork.dat文件,我不确定 RedisQFork 文件是否变小一点, 但我确定, 你可以通过设置Redis启动参数 heapdir 来调整这个文件的位置。 我在 redis.windows.conf文件中搜索"heapd...

python结合redis模拟队列

实在无聊就写了个很小的python程序用来实现模拟redis队列的代码如下: redis_lpush.py   #!/usr/bin/python3 import time import redis   def handle(info):     print(info)     time.sleep(2)   def main():     pool =...

七、企业级的redis数据备份和各种灾难下的数据恢复,是怎么做得呢?

1、企业级的持久化的配置策略 在企业中,RDB的生成策略,用默认的也差不多 save 60 10000:如果你希望尽可能确保说,RDB最多丢1分钟的数据,那么尽量就是每隔1分钟都生成一个快照,低峰期,数据量很少,也没必要 10000->生成RDB,1000->RDB,这个根据你自己的应用和业务的数据量,你自己去决定 AOF一定要打开,fsync...

缓存的应用场景以及要注意的问题

什么是缓存(cache): 在项目中没有必要每次请求都查询数据库的情况就可以使用缓存,让每次请求先查询缓存,如果命中,就直接返回缓存结果,如果没有命中,就查询数据库, 并将查询结果放入缓存,下次请求时查询缓存命中,直接返回结果,就不用再次查询数据库。 缓存的作用? 缓和较慢存储的高频请求,缓解数据库压力,提升响应速率。 为什么缓存可以提高响应速度? 因为缓...