前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【译】Celery文档2:Next Steps——在项目中使用Celery

【译】Celery文档2:Next Steps——在项目中使用Celery

作者头像
一只大鸽子
发布2024-05-10 19:01:40
570
发布2024-05-10 19:01:40
举报

https://docs.celeryq.dev/en/latest/getting-started/next-steps.html#next-steps

在项目中使用Celery

Project

项目文件结构:

代码语言:javascript
复制
src/
    proj/__init__.py
        /celery.py
        /tasks.py

proj/celery.py

代码语言:javascript
复制
from celery import Celery

app = Celery('proj',
             broker='amqp://',
             backend='rpc://',
             include=['proj.tasks'])

# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()

Celery的参数:

  • broker:代理
  • backend:结果后端
  • includeworker启动时要导入的模块列表。

然后我们定义了一些函数,并注册为任务@app.taskproj/tasks.py

代码语言:javascript
复制
from .celery import app


@app.task
def add(x, y):
    return x + y


@app.task
def mul(x, y):
    return x * y


@app.task
def xsum(numbers):
    return sum(numbers)

启动 worker

使用 celery 程序来启动 worker(需要在 proj上层目录(即src)下运行)

代码语言:javascript
复制
celery -A proj worker -l INFO

停止worker Ctrl+C

后台运行worker: 守护程序脚本使用 celery multi 命令在后台启动一个或多个工作线程: celery multi start w1 -A proj -l INFO

您也可以重启celery multi restart w1 -A proj -l INFO停止 celery multi stop w1 -A proj -l INFO stop 命令是异步的,因此它不会等待工作线程关闭。您可能希望改用该 stopwait 命令,该命令可确保在退出之前完成所有当前正在执行的任务: celery multi stopwait w1 -A proj -l INFO

默认情况下,它将在当前目录中创建 pid 和日志文件。为了防止多个工作线程相互叠加启动,建议您将这些工作线程放在一个专用目录中:

代码语言:javascript
复制
mkdir -p /var/run/celery
mkdir -p /var/log/celery
celery multi start w1 -A proj -l INFO --pidfile=/var/run/celery/%n.pid \
                                        --logfile=/var/log/celery/%n%I.log

使用 multi 命令,您可以启动多个 worker,并且还有一个强大的命令行语法来指定不同 worker 的参数,例如:

代码语言:javascript
复制
celery multi start 10 -A proj -l INFO -Q:1-3 images,video -Q:4,5 data \
    -Q default -L:4,5 debug

Calling Tasks(调用任务)

  1. 1. 可以使用delay()调用任务
代码语言:javascript
复制
from proj.tasks import add
add.delay(2, 2)

delay方法实际上是apply_async()的快捷方式,add.delay(2, 2)相当于add.apply_async((2, 2))。 2. apply_async()允许更多的选择,如运行时间(countdown),队列(queue):

代码语言:javascript
复制
add.apply_async((2, 2), queue='lopri', countdown=10)
  1. 1. 直接调用任务将在当前进程中执行任务,因此不会发送任何消息:
代码语言:javascript
复制
add(2, 2)

delayapply_async 方法返回一个 AsyncResult 实例,可用于跟踪任务执行状态。但为此,您需要启用结果后端(result backend),以便状态可以存储在某个地方。

配置了结果后端,就可通过res.get()获得任务的返回值:

代码语言:javascript
复制
res = add.delay(2, 2)
res.get(timeout=1) 

如果任务引发异常,您还可以检查异常并回溯, result.get() 默认情况下会传播错误(Trackback...)。

要检查任务是成功还是失败,您必须在结果实例上使用相应的方法:

代码语言:javascript
复制
res.failed()
#True

res.successful()
#False

可以通过查看任务state判断任务是否失败:

代码语言:javascript
复制
res.state
#'FAILURE'

任务的状态在成功执行的情况下会这样变化:

代码语言:javascript
复制
PENDING -> STARTED -> SUCCESS

如果重试任务,则各个阶段可能会变得更加复杂。为了演示,对于重试两次的任务,阶段将是:

代码语言:javascript
复制
PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS

Canvas:设计工作流

您刚刚学会了如何使用delay方法调用任务。但有时您可能希望将任务调用的签名(signature)传递给另一个进程,或者作为参数传递给另一个函数,Celery 为此使用了一种称为签名(signature)的东西。 签名包装了单个任务调用的参数执行选项,使其可以传递给函数,甚至可以序列化并通过网络发送。

您可以使用参数 (2, 2) 和countdiwb=10 秒为 add 任务创建签名:

代码语言:javascript
复制
add.signature((2, 2), countdown=10)
#tasks.add(2, 2)

还有一个快捷方式来创建签名:

代码语言:javascript
复制
add.s(2, 2)

And there’s that calling API again…

签名实例还支持calling API,因此它具有delay和apply_async方法。

代码语言:javascript
复制
s1 = add.s(2, 2)
res = s1.delay()
res.get()

你也可以制作不完整的签名:

代码语言:javascript
复制
# incomplete partial: add(?, 2)
s2 = add.s(2)

res = s2.delay(8)
res.get()
10

也可以在签名中添加关键字参数

代码语言:javascript
复制
3 = add.s(2, 2, debug=True)
s3.delay(debug=False)   # debug is now False.

这看上去很好,但是究竟能用来干什么呢?为此,需要介绍Canvas的一些基元。

The primitives

  • • group
  • • chain
  • • chord
  • • map
  • • starmap
  • • chunks 这些基元都是签名对象,因此它们可以组合在一起,组成复杂的工作流。

group

group并行调用任务列表,并返回一个特殊的结果实例,该实例允许你将结果作为组进行检查,并按顺序检索返回值。

代码语言:javascript
复制
from celery import group
from proj.tasks import add

group(add.s(i, i) for i in range(10))().get()
#[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

Partial group

代码语言:javascript
复制
g = group(add.s(i) for i in range(10))
g(10).get()
#[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

chain 任务可以链式调用,调用完一个任务后继续调用另一个任务:

代码语言:javascript
复制
from celery import chain
from proj.tasks import add, mul

# (4 + 4) * 8
chain(add.s(4, 4) | mul.s(8))().get()
64

partial chain:

代码语言:javascript
复制
#(? + 4) * 8
g = chain(add.s(4) | mul.s(8))
g(4).get()
64

链也可以这样写: (add.s(4, 4) | mul.s(8))().get()

chord chord是具有回调的group:

代码语言:javascript
复制
from celery import chord
from proj.tasks import add, xsum

chord((add.s(i, i) for i in range(10)), xsum.s())().get()
90

链接到另一个任务的组将自动转换为chord:

代码语言:javascript
复制
(group(add.s(i, i) for i in range(10)) | xsum.s())().get()
90

由于这些基元都是签名类型,因此它们几乎可以随心所欲地组合,例如:

代码语言:javascript
复制
upload_document.s(file) | group(apply_filter.s() for filter in filters)

Routing(路由)

Celery 支持 AMQP 提供的所有路由工具,但它也支持将消息发送到指定队列的简单路由。 task_routes 设置使您能够按名称路由任务,并将所有内容集中在一个位置:

代码语言:javascript
复制
app.conf.update(
    task_routes = {
        'proj.tasks.add': {'queue': 'hipri'},
    },
)

您还可以在运行时指定队列,方法是指定apply_async的queue参数:

代码语言:javascript
复制
from proj.tasks import add
add.apply_async((2, 2), queue='hipri')

然后,您可以通过指定 celery worker -Q 选项使worker从此队列中consume:

代码语言:javascript
复制
celery -A proj worker -Q hipri

您可以使用逗号分隔的列表指定多个队列。例如,您可以让辅worker同时consume默认队列(celery)和 hipri 队列

代码语言:javascript
复制
celery -A proj worker -Q hipri,celery

Remote Control 远程控制

如果您使用 RabbitMQ (AMQP)、Redis 或 Qpid 作为代理,则可以在运行时控制和检查工作线程。 例如,您可以查看worker正在处理的任务:

代码语言:javascript
复制
celery -A proj inspect active

这是通过使用广播消息来实现的,因此集群中的每个工作线程都会接收所有远程控制命令。

可用 --destination 选项指定worker对请求执行操作。以下是以逗号分隔的工作器主机名列表:

代码语言:javascript
复制
celery -A proj inspect active --destination=celery@example.com

Monitoring Guide[1]

Timezone

日期和时间,内部和消息中都使用UTC时区。 当worker受到消息时,会将UTC时间转换成本地时间。 (通常不需要手动设置时区)可以通过timezone设置时区:

代码语言:javascript
复制
app.conf.timezone = 'Europe/London'

Optimization

默认配置未针对吞吐量进行优化。默认情况下,它尝试在许多短任务和较少的长任务之间走中间路线,这是吞吐量和公平计划之间的折衷。

What's now?

现在您已经阅读了本文档,您应该继续阅读用户指南[2]

还有一个API参考[3],可能会有用。

引用链接

[1] Monitoring Guide: https://docs.celeryq.dev/en/latest/userguide/monitoring.html#guide-monitoring [2] 用户指南: https://docs.celeryq.dev/en/latest/userguide/index.html#guide [3] API参考: https://docs.celeryq.dev/en/latest/reference/index.html#apiref

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2024-04-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 一只大鸽子 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 在项目中使用Celery
    • Project
      • 启动 worker
      • Calling Tasks(调用任务)
      • Canvas:设计工作流
        • And there’s that calling API again…
          • The primitives
            • 引用链接
        • Routing(路由)
        • Remote Control 远程控制
        • Timezone
        • Optimization
        • What's now?
        相关产品与服务
        云数据库 Redis
        腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档